Return of honkbot, in Rust. Hopefully it won't die all the time.
1use futures_util::StreamExt;
2use rand::seq::SliceRandom;
3use serde::Deserialize;
4use std::collections::HashSet;
5use std::io::Cursor;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::{RwLock, Semaphore};
10use tokio_tungstenite::tungstenite::Message;
11use tracing::{error, info, warn};
12
13// --- Command configuration ---
14
15struct BotCommand {
16 trigger: &'static str, // e.g. "/honk"
17 reply_text: &'static str,
18 flickr_search: &'static str,
19 alt_text: &'static str,
20 fallback_folder: &'static str, // e.g. "honk"
21}
22
23const COMMANDS: &[BotCommand] = &[
24 BotCommand {
25 trigger: "/honk",
26 reply_text: "HONK",
27 flickr_search: "goose geese bird",
28 alt_text: "A goose",
29 fallback_folder: "honk",
30 },
31 BotCommand {
32 trigger: "/awoo",
33 reply_text: "AWOO",
34 flickr_search: "pup wolves wolf",
35 alt_text: "A wolf",
36 fallback_folder: "awoo",
37 },
38 BotCommand {
39 trigger: "/baaa",
40 reply_text: "BAAA",
41 flickr_search: "kid goats goat",
42 alt_text: "A goat",
43 fallback_folder: "baaa",
44 },
45 BotCommand {
46 trigger: "/bork",
47 reply_text: "BORK",
48 flickr_search: "puppy puppies dog",
49 alt_text: "A dog",
50 fallback_folder: "bork",
51 },
52 BotCommand {
53 trigger: "/hiss",
54 reply_text: "HISS",
55 flickr_search: "snake reptile hiss",
56 alt_text: "A snake",
57 fallback_folder: "hiss",
58 },
59 BotCommand {
60 trigger: "/hoot",
61 reply_text: "HOOT",
62 flickr_search: "owlet owls owl",
63 alt_text: "An owl",
64 fallback_folder: "hoot",
65 },
66 BotCommand {
67 trigger: "/meehh",
68 reply_text: "MEEHH",
69 flickr_search: "lamb sheep lambs",
70 alt_text: "A lamb",
71 fallback_folder: "meehh",
72 },
73 BotCommand {
74 trigger: "/meow",
75 reply_text: "MEOW",
76 flickr_search: "kitty kitties cat",
77 alt_text: "A cat",
78 fallback_folder: "meow",
79 },
80 BotCommand {
81 trigger: "/mumble",
82 reply_text: "MUMBLE",
83 flickr_search: "marmot marmots rodent nature",
84 alt_text: "A marmot",
85 fallback_folder: "mumble",
86 },
87 BotCommand {
88 trigger: "/oink",
89 reply_text: "OINK",
90 flickr_search: "pig piggy oink farm",
91 alt_text: "A pig",
92 fallback_folder: "oink",
93 },
94 BotCommand {
95 trigger: "/ribbit",
96 reply_text: "RIBBIT",
97 flickr_search: "frog frogs amphibian nature",
98 alt_text: "A frog",
99 fallback_folder: "ribbit",
100 },
101 BotCommand {
102 trigger: "/squee",
103 reply_text: "SQUEE",
104 flickr_search: "capybaras rodent capybaras nature",
105 alt_text: "A capybara",
106 fallback_folder: "squee",
107 },
108 BotCommand {
109 trigger: "/yowl",
110 reply_text: "YOWL",
111 flickr_search: "lynx bobcats bobcat",
112 alt_text: "A bobcat",
113 fallback_folder: "yowl",
114 },
115];
116
117// --- Jetstream types ---
118
119#[derive(Debug, Deserialize)]
120struct JetstreamEvent {
121 did: Option<String>,
122 time_us: Option<u64>,
123 #[allow(dead_code)]
124 kind: Option<String>,
125 commit: Option<JetstreamCommit>,
126}
127
128#[derive(Debug, Deserialize)]
129struct JetstreamCommit {
130 #[allow(dead_code)]
131 rev: Option<String>,
132 operation: Option<String>,
133 collection: Option<String>,
134 rkey: Option<String>,
135 record: Option<serde_json::Value>,
136 cid: Option<String>,
137}
138
139// --- Flickr types ---
140
141#[derive(Debug, Deserialize)]
142struct FlickrPhoto {
143 id: String,
144}
145
146#[derive(Debug, Deserialize)]
147struct FlickrPhotosResult {
148 photo: Vec<FlickrPhoto>,
149}
150
151#[derive(Debug, Deserialize)]
152struct FlickrSearchResponse {
153 photos: FlickrPhotosResult,
154}
155
156#[derive(Debug, Deserialize)]
157struct FlickrSize {
158 source: String,
159 #[allow(dead_code)]
160 label: Option<String>,
161}
162
163#[derive(Debug, Deserialize)]
164struct FlickrSizes {
165 size: Vec<FlickrSize>,
166}
167
168#[derive(Debug, Deserialize)]
169struct FlickrSizesResponse {
170 sizes: FlickrSizes,
171}
172
173// --- Constellation (follower check) ---
174
175#[derive(Debug, Deserialize)]
176struct ConstellationBacklinks {
177 total: u64,
178}
179
180/// Check if `follower_did` follows the bot (`bot_did`) using Constellation's getBacklinks.
181/// Returns true if a follow record exists, false otherwise.
182async fn is_follower_constellation(
183 http: &reqwest::Client,
184 bot_did: &str,
185 follower_did: &str,
186) -> anyhow::Result<bool> {
187 let url = format!(
188 "https://constellation.microcosm.blue/xrpc/blue.microcosm.links.getBacklinks?subject={}&source=app.bsky.graph.follow:subject&did={}&limit=1",
189 urlencoding::encode(bot_did),
190 urlencoding::encode(follower_did),
191 );
192
193 let resp: ConstellationBacklinks = http.get(&url).send().await?.json().await?;
194 Ok(resp.total > 0)
195}
196
197// --- Follower cache ---
198
199/// In-memory cache of follower DIDs, updated in real-time via the firehose.
200/// Falls back to Constellation API for DIDs not yet seen in the firehose.
201struct FollowerCache {
202 /// DIDs known to be followers
203 followers: RwLock<HashSet<String>>,
204 /// DIDs known to NOT be followers (to avoid repeated Constellation lookups)
205 non_followers: RwLock<HashSet<String>>,
206}
207
208impl FollowerCache {
209 fn new() -> Arc<Self> {
210 Arc::new(Self {
211 followers: RwLock::new(HashSet::new()),
212 non_followers: RwLock::new(HashSet::new()),
213 })
214 }
215
216 /// Record a new follow from the firehose
217 async fn add_follower(&self, did: &str) {
218 self.non_followers.write().await.remove(did);
219 self.followers.write().await.insert(did.to_string());
220 info!(did = %did, "Follower cache: added follower");
221 }
222
223 /// Record an unfollow from the firehose
224 async fn remove_follower(&self, did: &str) {
225 self.followers.write().await.remove(did);
226 self.non_followers.write().await.insert(did.to_string());
227 info!(did = %did, "Follower cache: removed follower");
228 }
229
230 /// Check if a DID is a follower, consulting cache first then Constellation.
231 async fn is_follower(
232 &self,
233 http: &reqwest::Client,
234 bot_did: &str,
235 did: &str,
236 ) -> anyhow::Result<bool> {
237 // Check positive cache
238 if self.followers.read().await.contains(did) {
239 return Ok(true);
240 }
241
242 // Check negative cache
243 if self.non_followers.read().await.contains(did) {
244 return Ok(false);
245 }
246
247 // Not in cache — fall back to Constellation and cache the result
248 let result = is_follower_constellation(http, bot_did, did).await?;
249 if result {
250 self.followers.write().await.insert(did.to_string());
251 } else {
252 self.non_followers.write().await.insert(did.to_string());
253 }
254 Ok(result)
255 }
256}
257
258// Bluesky API uses serde_json::json! for record construction
259
260// --- Bluesky session ---
261
262#[derive(Debug, Deserialize)]
263struct Session {
264 #[serde(rename = "accessJwt")]
265 access_jwt: String,
266 did: String,
267}
268
269struct BskyClient {
270 http: reqwest::Client,
271 service: String,
272 access_jwt: tokio::sync::RwLock<String>,
273 did: String,
274 handle: String,
275 password: String,
276}
277
278impl BskyClient {
279 async fn login(
280 http: &reqwest::Client,
281 service: &str,
282 handle: &str,
283 password: &str,
284 ) -> anyhow::Result<Session> {
285 let resp = http
286 .post(format!("{}/xrpc/com.atproto.server.createSession", service))
287 .json(&serde_json::json!({
288 "identifier": handle,
289 "password": password,
290 }))
291 .send()
292 .await?;
293
294 if !resp.status().is_success() {
295 let status = resp.status();
296 let body = resp.text().await.unwrap_or_default();
297 anyhow::bail!("Login failed ({}): {}", status, body);
298 }
299
300 Ok(resp.json::<Session>().await?)
301 }
302
303 async fn new(service: String, handle: String, password: String) -> anyhow::Result<Arc<Self>> {
304 let http = reqwest::Client::new();
305 let session = Self::login(&http, &service, &handle, &password).await?;
306 info!(did = %session.did, "Logged in to Bluesky");
307
308 Ok(Arc::new(Self {
309 http,
310 service,
311 access_jwt: tokio::sync::RwLock::new(session.access_jwt),
312 did: session.did,
313 handle,
314 password,
315 }))
316 }
317
318 async fn refresh_session(&self) -> anyhow::Result<()> {
319 let session = Self::login(&self.http, &self.service, &self.handle, &self.password).await?;
320 let mut jwt = self.access_jwt.write().await;
321 *jwt = session.access_jwt;
322 info!("Refreshed Bluesky session");
323 Ok(())
324 }
325
326 async fn upload_blob(&self, data: &[u8], mime: &str) -> anyhow::Result<serde_json::Value> {
327 let jwt = self.access_jwt.read().await.clone();
328 let resp = self
329 .http
330 .post(format!(
331 "{}/xrpc/com.atproto.repo.uploadBlob",
332 self.service
333 ))
334 .header("Authorization", format!("Bearer {}", jwt))
335 .header("Content-Type", mime)
336 .body(data.to_vec())
337 .send()
338 .await?;
339
340 if !resp.status().is_success() {
341 let status = resp.status();
342 let body = resp.text().await.unwrap_or_default();
343 anyhow::bail!("Upload blob failed ({}): {}", status, body);
344 }
345
346 let body: serde_json::Value = resp.json().await?;
347 Ok(body["blob"].clone())
348 }
349
350 async fn create_record(&self, request: serde_json::Value) -> anyhow::Result<serde_json::Value> {
351 let jwt = self.access_jwt.read().await.clone();
352 let resp = self
353 .http
354 .post(format!(
355 "{}/xrpc/com.atproto.repo.createRecord",
356 self.service
357 ))
358 .header("Authorization", format!("Bearer {}", jwt))
359 .json(&request)
360 .send()
361 .await?;
362
363 if !resp.status().is_success() {
364 let status = resp.status();
365 let body = resp.text().await.unwrap_or_default();
366 anyhow::bail!("Create record failed ({}): {}", status, body);
367 }
368
369 Ok(resp.json::<serde_json::Value>().await?)
370 }
371}
372
373// --- Flickr ---
374
375async fn fetch_random_flickr_image(
376 http: &reqwest::Client,
377 flickr_key: &str,
378 search_text: &str,
379) -> anyhow::Result<Vec<u8>> {
380 // Search for photos
381 let search_url = format!(
382 "https://api.flickr.com/services/rest?api_key={}&method=flickr.photos.search&text={}&sort=interestingness-desc&content_type=0&format=json&nojsoncallback=1",
383 flickr_key,
384 urlencoding::encode(search_text)
385 );
386
387 let resp: FlickrSearchResponse = http.get(&search_url).send().await?.json().await?;
388
389 if resp.photos.photo.is_empty() {
390 anyhow::bail!("No photos found for search: {}", search_text);
391 }
392
393 // Pick a random photo
394 let photo = resp
395 .photos
396 .photo
397 .choose(&mut rand::thread_rng())
398 .ok_or_else(|| anyhow::anyhow!("Failed to pick random photo"))?;
399
400 // Get photo sizes
401 let sizes_url = format!(
402 "https://api.flickr.com/services/rest?api_key={}&method=flickr.photos.getSizes&format=json&nojsoncallback=1&photo_id={}",
403 flickr_key, photo.id
404 );
405
406 let sizes_resp: FlickrSizesResponse = http.get(&sizes_url).send().await?.json().await?;
407
408 // Pick a medium-large size (similar to bash: tail -n 4 | head -n 1 picks 4th from end)
409 let sizes = &sizes_resp.sizes.size;
410 if sizes.is_empty() {
411 anyhow::bail!("No sizes available for photo {}", photo.id);
412 }
413 let idx = if sizes.len() >= 4 {
414 sizes.len() - 4
415 } else {
416 0
417 };
418 let image_url = &sizes[idx].source;
419
420 // Download image
421 let image_bytes = http.get(image_url).send().await?.bytes().await?;
422
423 // Resize if > 1MB (Bluesky blob limit)
424 let mut data = image_bytes.to_vec();
425 if data.len() > 1_000_000 {
426 info!(
427 size = data.len(),
428 "Image too large, resizing to fit under 1MB"
429 );
430 data = resize_image(&data, 1_000_000)?;
431 }
432
433 Ok(data)
434}
435
436fn resize_image(data: &[u8], max_bytes: usize) -> anyhow::Result<Vec<u8>> {
437 let img = image::load_from_memory(data)?;
438
439 // Iteratively reduce quality/size
440 let mut quality = 85u8;
441 let mut current = img.clone();
442
443 loop {
444 let mut buf = Vec::new();
445 let mut cursor = Cursor::new(&mut buf);
446 current.write_to(&mut cursor, image::ImageFormat::Jpeg)?;
447
448 if buf.len() <= max_bytes || quality <= 10 {
449 return Ok(buf);
450 }
451
452 // Reduce quality or dimensions
453 if quality > 20 {
454 quality -= 10;
455 let mut buf2 = Vec::new();
456 {
457 let mut encoder =
458 image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf2, quality);
459 encoder.encode_image(¤t)?;
460 }
461 if buf2.len() <= max_bytes {
462 return Ok(buf2);
463 }
464 }
465
466 // Scale down by 75%
467 let new_w = (current.width() * 3) / 4;
468 let new_h = (current.height() * 3) / 4;
469 if new_w < 100 || new_h < 100 {
470 // Just return what we have at lowest quality
471 let mut buf2 = Vec::new();
472 let mut encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf2, 10);
473 encoder.encode_image(¤t)?;
474 return Ok(buf2);
475 }
476 current = current.resize(new_w, new_h, image::imageops::FilterType::Lanczos3);
477 }
478}
479
480// --- Fallback images ---
481
482fn pick_random_fallback_image(fallback_dir: &Path, folder: &str) -> anyhow::Result<Vec<u8>> {
483 let dir = fallback_dir.join(folder);
484 let entries: Vec<PathBuf> = std::fs::read_dir(&dir)?
485 .filter_map(|e| e.ok())
486 .map(|e| e.path())
487 .filter(|p| {
488 p.extension()
489 .and_then(|ext| ext.to_str())
490 .map(|ext| matches!(ext.to_lowercase().as_str(), "jpg" | "jpeg" | "png" | "webp"))
491 .unwrap_or(false)
492 })
493 .collect();
494
495 if entries.is_empty() {
496 anyhow::bail!("No fallback images in {}", dir.display());
497 }
498
499 let path = entries
500 .choose(&mut rand::thread_rng())
501 .ok_or_else(|| anyhow::anyhow!("Failed to pick fallback image"))?;
502
503 let data = std::fs::read(path)?;
504 info!(
505 path = %path.display(),
506 "Using fallback image"
507 );
508 Ok(data)
509}
510
511// --- Main logic ---
512
513async fn handle_command(
514 bsky: Arc<BskyClient>,
515 flickr_key: String,
516 fallback_dir: PathBuf,
517 post_uri: String,
518 post_cid: String,
519 command: &'static BotCommand,
520) {
521 let http = reqwest::Client::new();
522
523 // Fetch random image from Flickr, fall back to local images on failure
524 let image_data: Vec<u8> = match fetch_random_flickr_image(&http, &flickr_key, command.flickr_search).await {
525 Ok(data) => data,
526 Err(e) => {
527 warn!(
528 command = command.trigger,
529 uri = %post_uri,
530 error = %e,
531 "Flickr fetch failed, trying fallback images"
532 );
533 match pick_random_fallback_image(&fallback_dir, command.fallback_folder) {
534 Ok(data) => data,
535 Err(e2) => {
536 error!(
537 command = command.trigger,
538 uri = %post_uri,
539 flickr_error = %e,
540 fallback_error = %e2,
541 "Both Flickr and fallback images failed"
542 );
543 return;
544 }
545 }
546 }
547 };
548
549 // Upload blob to Bluesky
550 let blob = match bsky.upload_blob(&image_data, "image/jpeg").await {
551 Ok(b) => b,
552 Err(e) => {
553 // Try refreshing session and retrying once
554 warn!(error = %e, "Blob upload failed, refreshing session and retrying");
555 if let Err(e2) = bsky.refresh_session().await {
556 error!(error = %e2, "Session refresh failed");
557 return;
558 }
559 match bsky.upload_blob(&image_data, "image/jpeg").await {
560 Ok(b) => b,
561 Err(e2) => {
562 error!(error = %e2, "Blob upload failed after retry");
563 return;
564 }
565 }
566 }
567 };
568
569 // Create reply post
570 let now = chrono_now();
571 let record = serde_json::json!({
572 "repo": bsky.did,
573 "collection": "app.bsky.feed.post",
574 "record": {
575 "$type": "app.bsky.feed.post",
576 "text": command.reply_text,
577 "createdAt": now,
578 "reply": {
579 "root": {
580 "uri": post_uri,
581 "cid": post_cid,
582 },
583 "parent": {
584 "uri": post_uri,
585 "cid": post_cid,
586 }
587 },
588 "embed": {
589 "$type": "app.bsky.embed.images",
590 "images": [{
591 "alt": command.alt_text,
592 "image": blob,
593 }]
594 }
595 }
596 });
597
598 match bsky.create_record(record).await {
599 Ok(_) => {
600 info!(
601 command = command.trigger,
602 uri = %post_uri,
603 "{} reply sent", command.reply_text
604 );
605 }
606 Err(e) => {
607 // Try refreshing and retrying once
608 warn!(error = %e, "Create record failed, refreshing session");
609 if let Err(e2) = bsky.refresh_session().await {
610 error!(error = %e2, "Session refresh failed");
611 return;
612 }
613 // Re-upload blob with new session
614 let blob2 = match bsky.upload_blob(&image_data, "image/jpeg").await {
615 Ok(b) => b,
616 Err(e2) => {
617 error!(error = %e2, "Blob re-upload failed after session refresh");
618 return;
619 }
620 };
621 let record2 = serde_json::json!({
622 "repo": bsky.did,
623 "collection": "app.bsky.feed.post",
624 "record": {
625 "$type": "app.bsky.feed.post",
626 "text": command.reply_text,
627 "createdAt": chrono_now(),
628 "reply": {
629 "root": {
630 "uri": post_uri,
631 "cid": post_cid,
632 },
633 "parent": {
634 "uri": post_uri,
635 "cid": post_cid,
636 }
637 },
638 "embed": {
639 "$type": "app.bsky.embed.images",
640 "images": [{
641 "alt": command.alt_text,
642 "image": blob2,
643 }]
644 }
645 }
646 });
647 match bsky.create_record(record2).await {
648 Ok(_) => {
649 info!(
650 command = command.trigger,
651 uri = %post_uri,
652 "{} reply sent (after retry)", command.reply_text
653 );
654 }
655 Err(e2) => {
656 error!(
657 command = command.trigger,
658 uri = %post_uri,
659 error = %e2,
660 "Create record failed after retry"
661 );
662 }
663 }
664 }
665 }
666}
667
668fn chrono_now() -> String {
669 use std::time::SystemTime;
670 let d = SystemTime::now()
671 .duration_since(SystemTime::UNIX_EPOCH)
672 .unwrap();
673 let secs = d.as_secs();
674 let millis = d.subsec_millis();
675 // Format as ISO 8601
676 let days_since_epoch = secs / 86400;
677 let time_of_day = secs % 86400;
678 let hours = time_of_day / 3600;
679 let minutes = (time_of_day % 3600) / 60;
680 let seconds = time_of_day % 60;
681
682 // Simple date calculation
683 let mut y = 1970i64;
684 let mut remaining = days_since_epoch as i64;
685 loop {
686 let days_in_year = if is_leap(y) { 366 } else { 365 };
687 if remaining < days_in_year {
688 break;
689 }
690 remaining -= days_in_year;
691 y += 1;
692 }
693 let month_days = if is_leap(y) {
694 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
695 } else {
696 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
697 };
698 let mut m = 0;
699 for (i, &days) in month_days.iter().enumerate() {
700 if remaining < days {
701 m = i;
702 break;
703 }
704 remaining -= days;
705 }
706
707 format!(
708 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z",
709 y,
710 m + 1,
711 remaining + 1,
712 hours,
713 minutes,
714 seconds,
715 millis
716 )
717}
718
719fn is_leap(y: i64) -> bool {
720 (y % 4 == 0 && y % 100 != 0) || y % 400 == 0
721}
722
723fn find_matching_command(text: &str) -> Option<&'static BotCommand> {
724 let lower = text.to_lowercase();
725 COMMANDS.iter().find(|cmd| lower.contains(cmd.trigger))
726}
727
728async fn connect_and_stream(
729 bsky: Arc<BskyClient>,
730 flickr_key: String,
731 fallback_dir: PathBuf,
732 follows_only: bool,
733 semaphore: Arc<Semaphore>,
734 follower_cache: Arc<FollowerCache>,
735) -> anyhow::Result<()> {
736 let jetstream_url = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.graph.follow";
737
738 info!("Connecting to Jetstream...");
739 let (ws_stream, _) = tokio_tungstenite::connect_async(jetstream_url).await?;
740 info!("Connected to Jetstream");
741
742 let (_write, mut read) = ws_stream.split();
743
744 let mut last_cursor: u64 = 0;
745
746 while let Some(msg) = read.next().await {
747 let msg = match msg {
748 Ok(m) => m,
749 Err(e) => {
750 error!(error = %e, "WebSocket read error");
751 return Err(e.into());
752 }
753 };
754
755 let text = match msg {
756 Message::Text(t) => t,
757 Message::Ping(_) => continue,
758 Message::Pong(_) => continue,
759 Message::Close(_) => {
760 warn!("WebSocket closed by server");
761 return Ok(());
762 }
763 _ => continue,
764 };
765
766 // Parse event
767 let event: JetstreamEvent = match serde_json::from_str(&text) {
768 Ok(e) => e,
769 Err(_) => continue,
770 };
771
772 // Track cursor for reconnection
773 if let Some(t) = event.time_us {
774 last_cursor = t;
775 }
776
777 // Only care about commit events with create operation on posts
778 let commit = match &event.commit {
779 Some(c) => c,
780 None => continue,
781 };
782
783 // Handle follow/unfollow events for our follower cache
784 if commit.collection.as_deref() == Some("app.bsky.graph.follow") {
785 if let Some(did) = &event.did {
786 // Check if this follow targets our bot
787 let is_about_us = commit
788 .record
789 .as_ref()
790 .and_then(|r| r.get("subject"))
791 .and_then(|s| s.as_str())
792 == Some(&bsky.did);
793
794 if is_about_us {
795 match commit.operation.as_deref() {
796 Some("create") => {
797 follower_cache.add_follower(did).await;
798 }
799 Some("delete") => {
800 follower_cache.remove_follower(did).await;
801 }
802 _ => {}
803 }
804 }
805 }
806 continue;
807 }
808
809 if commit.operation.as_deref() != Some("create") {
810 continue;
811 }
812 if commit.collection.as_deref() != Some("app.bsky.feed.post") {
813 continue;
814 }
815
816 // Check the post text for a trigger
817 let record = match &commit.record {
818 Some(r) => r,
819 None => continue,
820 };
821
822 let post_text = match record.get("text").and_then(|t| t.as_str()) {
823 Some(t) => t,
824 None => continue,
825 };
826
827 let command = match find_matching_command(post_text) {
828 Some(c) => c,
829 None => continue,
830 };
831
832 let did = match &event.did {
833 Some(d) => d.clone(),
834 None => continue,
835 };
836 let rkey = match &commit.rkey {
837 Some(r) => r.clone(),
838 None => continue,
839 };
840 let cid = match &commit.cid {
841 Some(c) => c.clone(),
842 None => continue,
843 };
844
845 let post_uri = format!("at://{}/app.bsky.feed.post/{}", did, rkey);
846
847 // Check if the poster follows the bot (if FOLLOWS_ONLY is enabled)
848 if follows_only {
849 match follower_cache.is_follower(&bsky.http, &bsky.did, &did).await {
850 Ok(true) => {}
851 Ok(false) => {
852 info!(
853 command = command.trigger,
854 poster = %did,
855 "Ignoring command from non-follower"
856 );
857 continue;
858 }
859 Err(e) => {
860 warn!(
861 command = command.trigger,
862 poster = %did,
863 error = %e,
864 "Follower check failed, skipping"
865 );
866 continue;
867 }
868 }
869 }
870
871 info!(
872 command = command.trigger,
873 uri = %post_uri,
874 "Matched command, queuing reply"
875 );
876
877 // Spawn bounded task
878 let bsky = bsky.clone();
879 let flickr_key = flickr_key.clone();
880 let fallback_dir = fallback_dir.clone();
881 let permit = semaphore.clone().acquire_owned().await?;
882
883 tokio::spawn(async move {
884 handle_command(bsky, flickr_key, fallback_dir, post_uri, cid, command).await;
885 drop(permit);
886 });
887 }
888
889 warn!(cursor = last_cursor, "Jetstream stream ended");
890 Ok(())
891}
892
893#[tokio::main]
894async fn main() -> anyhow::Result<()> {
895 // Initialize logging
896 tracing_subscriber::fmt()
897 .with_env_filter(
898 tracing_subscriber::EnvFilter::try_from_default_env()
899 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
900 )
901 .init();
902
903 // Load .env
904 dotenvy::dotenv().ok();
905
906 let bsky_handle =
907 std::env::var("BSKY_HANDLE").expect("BSKY_HANDLE env var required");
908 let bsky_password =
909 std::env::var("BSKY_PASSWORD").expect("BSKY_PASSWORD env var required");
910 let flickr_key =
911 std::env::var("FLICKR_KEY").expect("FLICKR_KEY env var required");
912 let bsky_service = std::env::var("BSKY_SERVICE")
913 .unwrap_or_else(|_| "https://bsky.social".to_string());
914 let follows_only = std::env::var("FOLLOWS_ONLY")
915 .map(|v| matches!(v.to_lowercase().as_str(), "1" | "true" | "yes"))
916 .unwrap_or(false);
917
918 // Resolve fallback image directory (default: ./fallback_images)
919 let fallback_dir = PathBuf::from(
920 std::env::var("FALLBACK_IMAGE_DIR").unwrap_or_else(|_| "fallback_images".to_string()),
921 );
922 let fallback_dir = std::fs::canonicalize(&fallback_dir).unwrap_or(fallback_dir);
923
924 info!(
925 handle = %bsky_handle,
926 service = %bsky_service,
927 fallback_dir = %fallback_dir.display(),
928 follows_only = follows_only,
929 commands = COMMANDS.len(),
930 "Starting honkbot"
931 );
932
933 // Login to Bluesky
934 let bsky: Arc<BskyClient> = BskyClient::new(bsky_service, bsky_handle, bsky_password).await?;
935
936 // Limit concurrent reply tasks (don't flood APIs)
937 let semaphore = Arc::new(Semaphore::new(10));
938
939 // Follower cache — updated in real-time via firehose follow events
940 let follower_cache = FollowerCache::new();
941
942 // Main loop with automatic reconnection
943 loop {
944 match connect_and_stream(bsky.clone(), flickr_key.clone(), fallback_dir.clone(), follows_only, semaphore.clone(), follower_cache.clone()).await {
945 Ok(()) => {
946 warn!("Stream ended normally, reconnecting in 5s...");
947 }
948 Err(e) => {
949 error!(error = %e, "Stream error, reconnecting in 5s...");
950 }
951 }
952
953 // Refresh session before reconnecting
954 if let Err(e) = bsky.refresh_session().await {
955 error!(error = %e, "Failed to refresh session before reconnect");
956 }
957
958 tokio::time::sleep(Duration::from_secs(5)).await;
959 }
960}