printer on atproto
1use std::sync::Arc;
2use std::time::Duration;
3
4use chrono::{DateTime, Utc};
5use futures_util::StreamExt;
6use miette::{IntoDiagnostic, Result};
7use serde::{Deserialize, Serialize};
8use tokio::sync::mpsc;
9use tokio_tungstenite::tungstenite::Message;
10
11use crate::printer::{Printer, PrinterEndpoint};
12use crate::resolver::Resolver;
13
14mod printer;
15mod resolver;
16
17const LEX: &str = "net.klbr.spool.job";
18const JETSTREAM: [&str; 3] = [
19 "wss://jetstream2.fr.hose.cam",
20 "wss://jetstream2.us-east.bsky.network",
21 "wss://jetstream2.us-west.bsky.network",
22];
23
24#[derive(Deserialize)]
25struct Event {
26 #[serde(rename = "time_us")]
27 cursor: u64,
28 did: String,
29 kind: String,
30 commit: Option<Commit>,
31}
32
33#[derive(Deserialize)]
34struct Commit {
35 collection: String,
36 operation: String,
37 record: Option<Box<serde_json::value::RawValue>>,
38}
39
40#[derive(Deserialize, Serialize, Debug)]
41#[serde(rename = "net.klbr.spool.job", rename_all = "camelCase", tag = "$type")]
42struct Job {
43 content: JobContent,
44}
45
46#[derive(Deserialize, Serialize, Debug)]
47#[serde(tag = "$type")]
48enum JobContent {
49 #[serde(rename = "net.klbr.spool.job.content.text")]
50 Text(TextContent),
51}
52
53impl JobContent {
54 fn kind(&self) -> &'static str {
55 match self {
56 JobContent::Text(_) => "text",
57 }
58 }
59}
60
61#[derive(Deserialize, Serialize, Debug)]
62#[serde(rename_all = "camelCase")]
63struct TextContent {
64 text: String,
65}
66
67struct Task {
68 date: DateTime<Utc>,
69 identifier: String,
70 job: Job,
71}
72
73type TaskTx = mpsc::Sender<Task>;
74type TaskRx = mpsc::Receiver<Task>;
75
76#[tokio::main]
77async fn main() -> Result<()> {
78 tracing_subscriber::fmt().init();
79
80 let printer = Printer::connect(
81 PrinterEndpoint::Usb {
82 vendor_id: 0x0416,
83 product_id: 0x5011,
84 },
85 Default::default(),
86 )?;
87
88 let password = std::env::var("SPOOL_PASSWORD").ok();
89
90 let allowed_dids: Vec<String> = tokio::fs::read_to_string("allowed.txt")
91 .await
92 .into_diagnostic()?
93 .lines()
94 .map(|l| l.find('#').map_or(l, |i| &l[..i]).trim().to_string())
95 .filter(|l| !l.is_empty())
96 .collect();
97
98 // prewarm identity cache
99 let resolver = Arc::new(Resolver::new());
100 for did in &allowed_dids {
101 resolver.resolve(did).await?;
102 }
103
104 let mut jetstream_query = format!("wantedCollections={LEX}");
105 for did in &allowed_dids {
106 jetstream_query.push_str(&format!("&wantedDids={did}"));
107 }
108
109 let (task_tx, task_rx) = mpsc::channel(16);
110 let job_handler = tokio::task::spawn_blocking(move || handle_jobs(printer, task_rx));
111 let job_stream = tokio::spawn(stream_jobs(jetstream_query, task_tx.clone(), resolver));
112 let api = tokio::spawn(serve_api(password, task_tx));
113
114 tokio::select! {
115 r = job_stream => r.into_diagnostic().flatten(),
116 r = job_handler => r.into_diagnostic().flatten(),
117 r = api => r.into_diagnostic().flatten(),
118 }
119}
120
121async fn stream_jobs(query: String, task_tx: TaskTx, resolver: Arc<Resolver>) -> Result<()> {
122 let mut jetstream_idx = 0;
123 loop {
124 let jetstream = JETSTREAM[jetstream_idx];
125 let url = format!("{jetstream}/subscribe?{query}");
126 match run_stream(&url, &task_tx, &resolver).await {
127 Ok(()) => tracing::warn!("{url} disconnected, reconnecting..."),
128 Err(e) => {
129 tracing::warn!(err = %e, "{url} error, reconnecting...");
130 jetstream_idx = (jetstream_idx + 1) % JETSTREAM.len();
131 }
132 }
133 tokio::time::sleep(Duration::from_secs(5)).await;
134 }
135}
136
137async fn run_stream(
138 url: &str,
139 task_tx: &TaskTx,
140 resolver: &Resolver,
141) -> Result<(), tokio_tungstenite::tungstenite::Error> {
142 let (mut ws, _) = tokio_tungstenite::connect_async(url).await?;
143 while let Some(msg) = ws.next().await {
144 let text = match msg? {
145 Message::Text(t) => t,
146 _ => continue,
147 };
148 let event: Event = match serde_json::from_str(&text) {
149 Ok(e) => e,
150 Err(e) => {
151 tracing::warn!(err = %e, "failed to parse event");
152 continue;
153 }
154 };
155
156 // invalidate identity cache if needed
157 if event.kind == "identity" {
158 resolver.invalidate(&event.did).await;
159 continue;
160 }
161
162 // handle records
163 let Some(commit) = event.commit else { continue };
164 if commit.operation != "create" || commit.collection != LEX {
165 continue;
166 }
167 let Some(record_raw) = commit.record else {
168 continue;
169 };
170 let job: Job = match serde_json::from_str(record_raw.get()) {
171 Ok(j) => j,
172 Err(e) => {
173 tracing::warn!(err = %e, "invalid record");
174 continue;
175 }
176 };
177 let identifier = match resolver.resolve(&event.did).await {
178 Ok(h) => h,
179 Err(e) => {
180 tracing::error!(err = %e, "can't resolve did");
181 resolver
182 .cached(&event.did)
183 .await
184 .unwrap_or_else(|| event.did.clone())
185 }
186 };
187 let date = DateTime::from_timestamp_micros(event.cursor as i64).unwrap_or_else(Utc::now);
188 let _ = task_tx
189 .send(Task {
190 identifier,
191 job,
192 date,
193 })
194 .await;
195 }
196 Ok(())
197}
198
199fn handle_jobs(mut printer: Printer, mut task_rx: TaskRx) -> Result<()> {
200 while let Some(task) = task_rx.blocking_recv() {
201 tracing::info!(
202 id = task.identifier,
203 kind = task.job.content.kind(),
204 "handling job"
205 );
206 match task.job.content {
207 JobContent::Text(c) if !c.text.is_empty() => {
208 printer
209 .centered(&task.identifier, 0xCD)?
210 .markdown(&c.text)?
211 .newline()?
212 .centered(&task.date.to_string(), 0xCD)?
213 .newline()?
214 .cut()?;
215 }
216 _ => continue,
217 }
218 }
219
220 Ok(())
221}
222
223async fn serve_api(password: Option<String>, task_tx: TaskTx) -> Result<()> {
224 use async_tiny::{Response, Server};
225
226 let mut server = Server::http("0.0.0.0:9889", true).await.into_diagnostic()?;
227
228 while let Some(request) = server.next().await {
229 let url = request.url().to_string();
230 let (path, query_str) = url.split_once('?').unwrap_or((&url, ""));
231
232 let params: std::collections::HashMap<&str, &str> = query_str
233 .split('&')
234 .filter_map(|p| p.split_once('='))
235 .collect();
236
237 if let Some(ref pw) = password {
238 if params.get("token").copied() != Some(pw.as_str()) {
239 let _ = request.respond(Response::from_status_and_string(400, "400 Bad Request"));
240 continue;
241 }
242 }
243
244 let Some(&name) = params.get("name") else {
245 let _ = request.respond(Response::from_status_and_string(400, "400 Bad Request"));
246 continue;
247 };
248 let name = name.to_string();
249
250 let response = match path {
251 "/print" => {
252 let text = String::from_utf8_lossy(request.body()).into_owned();
253 let _ = task_tx
254 .send(Task {
255 date: Utc::now(),
256 identifier: name,
257 job: Job {
258 content: JobContent::Text(TextContent { text }),
259 },
260 })
261 .await;
262 Response::from_string("ok").with_content_type("application/json")
263 }
264 _ => Response::from_status_and_string(404, "404 Not Found"),
265 };
266
267 let _ = request.respond(response);
268 }
269
270 Ok(())
271}