printer on atproto
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

use multiple jetstream urls

dawn c2db6c8a 0b46613f

+17 -7
+17 -7
src/main.rs
··· 15 15 mod resolver; 16 16 17 17 const LEX: &str = "net.klbr.printer.job"; 18 - const JETSTREAM: &str = "wss://jetstream2.fr.hose.cam"; 18 + const JETSTREAM: [&str; 3] = [ 19 + "wss://jetstream2.fr.hose.cam", 20 + "wss://jetstream2.us-east.bsky.network", 21 + "wss://jetstream2.us-west.bsky.network", 22 + ]; 19 23 20 24 #[derive(Deserialize)] 21 25 struct Event { ··· 91 95 resolver.resolve(did).await?; 92 96 } 93 97 94 - let mut url = format!("{JETSTREAM}/subscribe?wantedCollections={LEX}"); 98 + let mut jetstream_query = format!("wantedCollections={LEX}"); 95 99 for did in &allowed_dids { 96 - url.push_str(&format!("&wantedDids={did}")); 100 + jetstream_query.push_str(&format!("&wantedDids={did}")); 97 101 } 98 102 99 103 let (task_tx, task_rx) = mpsc::channel(16); 100 104 let job_handler = tokio::task::spawn_blocking(move || handle_jobs(printer, task_rx)); 101 - let job_stream = tokio::spawn(stream_jobs(url, task_tx, resolver)); 105 + let job_stream = tokio::spawn(stream_jobs(jetstream_query, task_tx, resolver)); 102 106 103 107 tokio::select! { 104 108 r = job_stream => r.into_diagnostic().flatten(), ··· 106 110 } 107 111 } 108 112 109 - async fn stream_jobs(url: String, task_tx: TaskTx, resolver: Arc<Resolver>) -> Result<()> { 113 + async fn stream_jobs(query: String, task_tx: TaskTx, resolver: Arc<Resolver>) -> Result<()> { 114 + let mut jetstream_idx = 0; 110 115 loop { 116 + let jetstream = JETSTREAM[jetstream_idx]; 117 + let url = format!("{jetstream}/subscribe?{query}"); 111 118 match run_stream(&url, &task_tx, &resolver).await { 112 - Ok(()) => tracing::warn!("jetstream disconnected, reconnecting..."), 113 - Err(e) => tracing::warn!(err = %e, "jetstream error, reconnecting..."), 119 + Ok(()) => tracing::warn!("{url} disconnected, reconnecting..."), 120 + Err(e) => { 121 + tracing::warn!(err = %e, "{url} error, reconnecting..."); 122 + jetstream_idx = (jetstream_idx + 1) % JETSTREAM.len(); 123 + } 114 124 } 115 125 tokio::time::sleep(Duration::from_secs(5)).await; 116 126 }