use std::sync::Arc; use std::time::Duration; use chrono::{DateTime, Utc}; use futures_util::StreamExt; use miette::{IntoDiagnostic, Result}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio_tungstenite::tungstenite::Message; use crate::printer::{Printer, PrinterEndpoint}; use crate::resolver::Resolver; mod printer; mod resolver; const LEX: &str = "net.klbr.spool.job"; const JETSTREAM: [&str; 3] = [ "wss://jetstream2.fr.hose.cam", "wss://jetstream2.us-east.bsky.network", "wss://jetstream2.us-west.bsky.network", ]; #[derive(Deserialize)] struct Event { #[serde(rename = "time_us")] cursor: u64, did: String, kind: String, commit: Option, } #[derive(Deserialize)] struct Commit { collection: String, operation: String, record: Option>, } #[derive(Deserialize, Serialize, Debug)] #[serde(rename = "net.klbr.spool.job", rename_all = "camelCase", tag = "$type")] struct Job { content: JobContent, } #[derive(Deserialize, Serialize, Debug)] #[serde(tag = "$type")] enum JobContent { #[serde(rename = "net.klbr.spool.job.content.text")] Text(TextContent), } impl JobContent { fn kind(&self) -> &'static str { match self { JobContent::Text(_) => "text", } } } #[derive(Deserialize, Serialize, Debug)] #[serde(rename_all = "camelCase")] struct TextContent { text: String, } struct Task { date: DateTime, identifier: String, job: Job, } type TaskTx = mpsc::Sender; type TaskRx = mpsc::Receiver; #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt().init(); let printer = Printer::connect( PrinterEndpoint::Usb { vendor_id: 0x0416, product_id: 0x5011, }, Default::default(), )?; let password = std::env::var("SPOOL_PASSWORD").ok(); let allowed_dids: Vec = tokio::fs::read_to_string("allowed.txt") .await .into_diagnostic()? .lines() .map(|l| l.find('#').map_or(l, |i| &l[..i]).trim().to_string()) .filter(|l| !l.is_empty()) .collect(); // prewarm identity cache let resolver = Arc::new(Resolver::new()); for did in &allowed_dids { resolver.resolve(did).await?; } let mut jetstream_query = format!("wantedCollections={LEX}"); for did in &allowed_dids { jetstream_query.push_str(&format!("&wantedDids={did}")); } let (task_tx, task_rx) = mpsc::channel(16); let job_handler = tokio::task::spawn_blocking(move || handle_jobs(printer, task_rx)); let job_stream = tokio::spawn(stream_jobs(jetstream_query, task_tx.clone(), resolver)); let api = tokio::spawn(serve_api(password, task_tx)); tokio::select! { r = job_stream => r.into_diagnostic().flatten(), r = job_handler => r.into_diagnostic().flatten(), r = api => r.into_diagnostic().flatten(), } } async fn stream_jobs(query: String, task_tx: TaskTx, resolver: Arc) -> Result<()> { let mut jetstream_idx = 0; loop { let jetstream = JETSTREAM[jetstream_idx]; let url = format!("{jetstream}/subscribe?{query}"); match run_stream(&url, &task_tx, &resolver).await { Ok(()) => tracing::warn!("{url} disconnected, reconnecting..."), Err(e) => { tracing::warn!(err = %e, "{url} error, reconnecting..."); jetstream_idx = (jetstream_idx + 1) % JETSTREAM.len(); } } tokio::time::sleep(Duration::from_secs(5)).await; } } async fn run_stream( url: &str, task_tx: &TaskTx, resolver: &Resolver, ) -> Result<(), tokio_tungstenite::tungstenite::Error> { let (mut ws, _) = tokio_tungstenite::connect_async(url).await?; while let Some(msg) = ws.next().await { let text = match msg? { Message::Text(t) => t, _ => continue, }; let event: Event = match serde_json::from_str(&text) { Ok(e) => e, Err(e) => { tracing::warn!(err = %e, "failed to parse event"); continue; } }; // invalidate identity cache if needed if event.kind == "identity" { resolver.invalidate(&event.did).await; continue; } // handle records let Some(commit) = event.commit else { continue }; if commit.operation != "create" || commit.collection != LEX { continue; } let Some(record_raw) = commit.record else { continue; }; let job: Job = match serde_json::from_str(record_raw.get()) { Ok(j) => j, Err(e) => { tracing::warn!(err = %e, "invalid record"); continue; } }; let identifier = match resolver.resolve(&event.did).await { Ok(h) => h, Err(e) => { tracing::error!(err = %e, "can't resolve did"); resolver .cached(&event.did) .await .unwrap_or_else(|| event.did.clone()) } }; let date = DateTime::from_timestamp_micros(event.cursor as i64).unwrap_or_else(Utc::now); let _ = task_tx .send(Task { identifier, job, date, }) .await; } Ok(()) } fn handle_jobs(mut printer: Printer, mut task_rx: TaskRx) -> Result<()> { while let Some(task) = task_rx.blocking_recv() { tracing::info!( id = task.identifier, kind = task.job.content.kind(), "handling job" ); match task.job.content { JobContent::Text(c) if !c.text.is_empty() => { printer .centered(&task.identifier, 0xCD)? .markdown(&c.text)? .newline()? .centered(&task.date.to_string(), 0xCD)? .newline()? .cut()?; } _ => continue, } } Ok(()) } async fn serve_api(password: Option, task_tx: TaskTx) -> Result<()> { use async_tiny::{Response, Server}; let mut server = Server::http("0.0.0.0:9889", true).await.into_diagnostic()?; while let Some(request) = server.next().await { let url = request.url().to_string(); let (path, query_str) = url.split_once('?').unwrap_or((&url, "")); let params: std::collections::HashMap<&str, &str> = query_str .split('&') .filter_map(|p| p.split_once('=')) .collect(); if let Some(ref pw) = password { if params.get("token").copied() != Some(pw.as_str()) { let _ = request.respond(Response::from_status_and_string(400, "400 Bad Request")); continue; } } let Some(&name) = params.get("name") else { let _ = request.respond(Response::from_status_and_string(400, "400 Bad Request")); continue; }; let name = name.to_string(); let response = match path { "/print" => { let text = String::from_utf8_lossy(request.body()).into_owned(); let _ = task_tx .send(Task { date: Utc::now(), identifier: name, job: Job { content: JobContent::Text(TextContent { text }), }, }) .await; Response::from_string("ok").with_content_type("application/json") } _ => Response::from_status_and_string(404, "404 Not Found"), }; let _ = request.respond(response); } Ok(()) }