printer on atproto
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 271 lines 8.0 kB view raw
1use std::sync::Arc; 2use std::time::Duration; 3 4use chrono::{DateTime, Utc}; 5use futures_util::StreamExt; 6use miette::{IntoDiagnostic, Result}; 7use serde::{Deserialize, Serialize}; 8use tokio::sync::mpsc; 9use tokio_tungstenite::tungstenite::Message; 10 11use crate::printer::{Printer, PrinterEndpoint}; 12use crate::resolver::Resolver; 13 14mod printer; 15mod resolver; 16 17const LEX: &str = "net.klbr.spool.job"; 18const JETSTREAM: [&str; 3] = [ 19 "wss://jetstream2.fr.hose.cam", 20 "wss://jetstream2.us-east.bsky.network", 21 "wss://jetstream2.us-west.bsky.network", 22]; 23 24#[derive(Deserialize)] 25struct Event { 26 #[serde(rename = "time_us")] 27 cursor: u64, 28 did: String, 29 kind: String, 30 commit: Option<Commit>, 31} 32 33#[derive(Deserialize)] 34struct Commit { 35 collection: String, 36 operation: String, 37 record: Option<Box<serde_json::value::RawValue>>, 38} 39 40#[derive(Deserialize, Serialize, Debug)] 41#[serde(rename = "net.klbr.spool.job", rename_all = "camelCase", tag = "$type")] 42struct Job { 43 content: JobContent, 44} 45 46#[derive(Deserialize, Serialize, Debug)] 47#[serde(tag = "$type")] 48enum JobContent { 49 #[serde(rename = "net.klbr.spool.job.content.text")] 50 Text(TextContent), 51} 52 53impl JobContent { 54 fn kind(&self) -> &'static str { 55 match self { 56 JobContent::Text(_) => "text", 57 } 58 } 59} 60 61#[derive(Deserialize, Serialize, Debug)] 62#[serde(rename_all = "camelCase")] 63struct TextContent { 64 text: String, 65} 66 67struct Task { 68 date: DateTime<Utc>, 69 identifier: String, 70 job: Job, 71} 72 73type TaskTx = mpsc::Sender<Task>; 74type TaskRx = mpsc::Receiver<Task>; 75 76#[tokio::main] 77async fn main() -> Result<()> { 78 tracing_subscriber::fmt().init(); 79 80 let printer = Printer::connect( 81 PrinterEndpoint::Usb { 82 vendor_id: 0x0416, 83 product_id: 0x5011, 84 }, 85 Default::default(), 86 )?; 87 88 let password = std::env::var("SPOOL_PASSWORD").ok(); 89 90 let allowed_dids: Vec<String> = tokio::fs::read_to_string("allowed.txt") 91 .await 92 .into_diagnostic()? 93 .lines() 94 .map(|l| l.find('#').map_or(l, |i| &l[..i]).trim().to_string()) 95 .filter(|l| !l.is_empty()) 96 .collect(); 97 98 // prewarm identity cache 99 let resolver = Arc::new(Resolver::new()); 100 for did in &allowed_dids { 101 resolver.resolve(did).await?; 102 } 103 104 let mut jetstream_query = format!("wantedCollections={LEX}"); 105 for did in &allowed_dids { 106 jetstream_query.push_str(&format!("&wantedDids={did}")); 107 } 108 109 let (task_tx, task_rx) = mpsc::channel(16); 110 let job_handler = tokio::task::spawn_blocking(move || handle_jobs(printer, task_rx)); 111 let job_stream = tokio::spawn(stream_jobs(jetstream_query, task_tx.clone(), resolver)); 112 let api = tokio::spawn(serve_api(password, task_tx)); 113 114 tokio::select! { 115 r = job_stream => r.into_diagnostic().flatten(), 116 r = job_handler => r.into_diagnostic().flatten(), 117 r = api => r.into_diagnostic().flatten(), 118 } 119} 120 121async fn stream_jobs(query: String, task_tx: TaskTx, resolver: Arc<Resolver>) -> Result<()> { 122 let mut jetstream_idx = 0; 123 loop { 124 let jetstream = JETSTREAM[jetstream_idx]; 125 let url = format!("{jetstream}/subscribe?{query}"); 126 match run_stream(&url, &task_tx, &resolver).await { 127 Ok(()) => tracing::warn!("{url} disconnected, reconnecting..."), 128 Err(e) => { 129 tracing::warn!(err = %e, "{url} error, reconnecting..."); 130 jetstream_idx = (jetstream_idx + 1) % JETSTREAM.len(); 131 } 132 } 133 tokio::time::sleep(Duration::from_secs(5)).await; 134 } 135} 136 137async fn run_stream( 138 url: &str, 139 task_tx: &TaskTx, 140 resolver: &Resolver, 141) -> Result<(), tokio_tungstenite::tungstenite::Error> { 142 let (mut ws, _) = tokio_tungstenite::connect_async(url).await?; 143 while let Some(msg) = ws.next().await { 144 let text = match msg? { 145 Message::Text(t) => t, 146 _ => continue, 147 }; 148 let event: Event = match serde_json::from_str(&text) { 149 Ok(e) => e, 150 Err(e) => { 151 tracing::warn!(err = %e, "failed to parse event"); 152 continue; 153 } 154 }; 155 156 // invalidate identity cache if needed 157 if event.kind == "identity" { 158 resolver.invalidate(&event.did).await; 159 continue; 160 } 161 162 // handle records 163 let Some(commit) = event.commit else { continue }; 164 if commit.operation != "create" || commit.collection != LEX { 165 continue; 166 } 167 let Some(record_raw) = commit.record else { 168 continue; 169 }; 170 let job: Job = match serde_json::from_str(record_raw.get()) { 171 Ok(j) => j, 172 Err(e) => { 173 tracing::warn!(err = %e, "invalid record"); 174 continue; 175 } 176 }; 177 let identifier = match resolver.resolve(&event.did).await { 178 Ok(h) => h, 179 Err(e) => { 180 tracing::error!(err = %e, "can't resolve did"); 181 resolver 182 .cached(&event.did) 183 .await 184 .unwrap_or_else(|| event.did.clone()) 185 } 186 }; 187 let date = DateTime::from_timestamp_micros(event.cursor as i64).unwrap_or_else(Utc::now); 188 let _ = task_tx 189 .send(Task { 190 identifier, 191 job, 192 date, 193 }) 194 .await; 195 } 196 Ok(()) 197} 198 199fn handle_jobs(mut printer: Printer, mut task_rx: TaskRx) -> Result<()> { 200 while let Some(task) = task_rx.blocking_recv() { 201 tracing::info!( 202 id = task.identifier, 203 kind = task.job.content.kind(), 204 "handling job" 205 ); 206 match task.job.content { 207 JobContent::Text(c) if !c.text.is_empty() => { 208 printer 209 .centered(&task.identifier, 0xCD)? 210 .markdown(&c.text)? 211 .newline()? 212 .centered(&task.date.to_string(), 0xCD)? 213 .newline()? 214 .cut()?; 215 } 216 _ => continue, 217 } 218 } 219 220 Ok(()) 221} 222 223async fn serve_api(password: Option<String>, task_tx: TaskTx) -> Result<()> { 224 use async_tiny::{Response, Server}; 225 226 let mut server = Server::http("0.0.0.0:9889", true).await.into_diagnostic()?; 227 228 while let Some(request) = server.next().await { 229 let url = request.url().to_string(); 230 let (path, query_str) = url.split_once('?').unwrap_or((&url, "")); 231 232 let params: std::collections::HashMap<&str, &str> = query_str 233 .split('&') 234 .filter_map(|p| p.split_once('=')) 235 .collect(); 236 237 if let Some(ref pw) = password { 238 if params.get("token").copied() != Some(pw.as_str()) { 239 let _ = request.respond(Response::from_status_and_string(400, "400 Bad Request")); 240 continue; 241 } 242 } 243 244 let Some(&name) = params.get("name") else { 245 let _ = request.respond(Response::from_status_and_string(400, "400 Bad Request")); 246 continue; 247 }; 248 let name = name.to_string(); 249 250 let response = match path { 251 "/print" => { 252 let text = String::from_utf8_lossy(request.body()).into_owned(); 253 let _ = task_tx 254 .send(Task { 255 date: Utc::now(), 256 identifier: name, 257 job: Job { 258 content: JobContent::Text(TextContent { text }), 259 }, 260 }) 261 .await; 262 Response::from_string("ok").with_content_type("application/json") 263 } 264 _ => Response::from_status_and_string(404, "404 Not Found"), 265 }; 266 267 let _ = request.respond(response); 268 } 269 270 Ok(()) 271}