Return of honkbot, in Rust. Hopefully it won't die all the time.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 960 lines 29 kB view raw
1use futures_util::StreamExt; 2use rand::seq::SliceRandom; 3use serde::Deserialize; 4use std::collections::HashSet; 5use std::io::Cursor; 6use std::path::{Path, PathBuf}; 7use std::sync::Arc; 8use std::time::Duration; 9use tokio::sync::{RwLock, Semaphore}; 10use tokio_tungstenite::tungstenite::Message; 11use tracing::{error, info, warn}; 12 13// --- Command configuration --- 14 15struct BotCommand { 16 trigger: &'static str, // e.g. "/honk" 17 reply_text: &'static str, 18 flickr_search: &'static str, 19 alt_text: &'static str, 20 fallback_folder: &'static str, // e.g. "honk" 21} 22 23const COMMANDS: &[BotCommand] = &[ 24 BotCommand { 25 trigger: "/honk", 26 reply_text: "HONK", 27 flickr_search: "goose geese bird", 28 alt_text: "A goose", 29 fallback_folder: "honk", 30 }, 31 BotCommand { 32 trigger: "/awoo", 33 reply_text: "AWOO", 34 flickr_search: "pup wolves wolf", 35 alt_text: "A wolf", 36 fallback_folder: "awoo", 37 }, 38 BotCommand { 39 trigger: "/baaa", 40 reply_text: "BAAA", 41 flickr_search: "kid goats goat", 42 alt_text: "A goat", 43 fallback_folder: "baaa", 44 }, 45 BotCommand { 46 trigger: "/bork", 47 reply_text: "BORK", 48 flickr_search: "puppy puppies dog", 49 alt_text: "A dog", 50 fallback_folder: "bork", 51 }, 52 BotCommand { 53 trigger: "/hiss", 54 reply_text: "HISS", 55 flickr_search: "snake reptile hiss", 56 alt_text: "A snake", 57 fallback_folder: "hiss", 58 }, 59 BotCommand { 60 trigger: "/hoot", 61 reply_text: "HOOT", 62 flickr_search: "owlet owls owl", 63 alt_text: "An owl", 64 fallback_folder: "hoot", 65 }, 66 BotCommand { 67 trigger: "/meehh", 68 reply_text: "MEEHH", 69 flickr_search: "lamb sheep lambs", 70 alt_text: "A lamb", 71 fallback_folder: "meehh", 72 }, 73 BotCommand { 74 trigger: "/meow", 75 reply_text: "MEOW", 76 flickr_search: "kitty kitties cat", 77 alt_text: "A cat", 78 fallback_folder: "meow", 79 }, 80 BotCommand { 81 trigger: "/mumble", 82 reply_text: "MUMBLE", 83 flickr_search: "marmot marmots rodent nature", 84 alt_text: "A marmot", 85 fallback_folder: "mumble", 86 }, 87 BotCommand { 88 trigger: "/oink", 89 reply_text: "OINK", 90 flickr_search: "pig piggy oink farm", 91 alt_text: "A pig", 92 fallback_folder: "oink", 93 }, 94 BotCommand { 95 trigger: "/ribbit", 96 reply_text: "RIBBIT", 97 flickr_search: "frog frogs amphibian nature", 98 alt_text: "A frog", 99 fallback_folder: "ribbit", 100 }, 101 BotCommand { 102 trigger: "/squee", 103 reply_text: "SQUEE", 104 flickr_search: "capybaras rodent capybaras nature", 105 alt_text: "A capybara", 106 fallback_folder: "squee", 107 }, 108 BotCommand { 109 trigger: "/yowl", 110 reply_text: "YOWL", 111 flickr_search: "lynx bobcats bobcat", 112 alt_text: "A bobcat", 113 fallback_folder: "yowl", 114 }, 115]; 116 117// --- Jetstream types --- 118 119#[derive(Debug, Deserialize)] 120struct JetstreamEvent { 121 did: Option<String>, 122 time_us: Option<u64>, 123 #[allow(dead_code)] 124 kind: Option<String>, 125 commit: Option<JetstreamCommit>, 126} 127 128#[derive(Debug, Deserialize)] 129struct JetstreamCommit { 130 #[allow(dead_code)] 131 rev: Option<String>, 132 operation: Option<String>, 133 collection: Option<String>, 134 rkey: Option<String>, 135 record: Option<serde_json::Value>, 136 cid: Option<String>, 137} 138 139// --- Flickr types --- 140 141#[derive(Debug, Deserialize)] 142struct FlickrPhoto { 143 id: String, 144} 145 146#[derive(Debug, Deserialize)] 147struct FlickrPhotosResult { 148 photo: Vec<FlickrPhoto>, 149} 150 151#[derive(Debug, Deserialize)] 152struct FlickrSearchResponse { 153 photos: FlickrPhotosResult, 154} 155 156#[derive(Debug, Deserialize)] 157struct FlickrSize { 158 source: String, 159 #[allow(dead_code)] 160 label: Option<String>, 161} 162 163#[derive(Debug, Deserialize)] 164struct FlickrSizes { 165 size: Vec<FlickrSize>, 166} 167 168#[derive(Debug, Deserialize)] 169struct FlickrSizesResponse { 170 sizes: FlickrSizes, 171} 172 173// --- Constellation (follower check) --- 174 175#[derive(Debug, Deserialize)] 176struct ConstellationBacklinks { 177 total: u64, 178} 179 180/// Check if `follower_did` follows the bot (`bot_did`) using Constellation's getBacklinks. 181/// Returns true if a follow record exists, false otherwise. 182async fn is_follower_constellation( 183 http: &reqwest::Client, 184 bot_did: &str, 185 follower_did: &str, 186) -> anyhow::Result<bool> { 187 let url = format!( 188 "https://constellation.microcosm.blue/xrpc/blue.microcosm.links.getBacklinks?subject={}&source=app.bsky.graph.follow:subject&did={}&limit=1", 189 urlencoding::encode(bot_did), 190 urlencoding::encode(follower_did), 191 ); 192 193 let resp: ConstellationBacklinks = http.get(&url).send().await?.json().await?; 194 Ok(resp.total > 0) 195} 196 197// --- Follower cache --- 198 199/// In-memory cache of follower DIDs, updated in real-time via the firehose. 200/// Falls back to Constellation API for DIDs not yet seen in the firehose. 201struct FollowerCache { 202 /// DIDs known to be followers 203 followers: RwLock<HashSet<String>>, 204 /// DIDs known to NOT be followers (to avoid repeated Constellation lookups) 205 non_followers: RwLock<HashSet<String>>, 206} 207 208impl FollowerCache { 209 fn new() -> Arc<Self> { 210 Arc::new(Self { 211 followers: RwLock::new(HashSet::new()), 212 non_followers: RwLock::new(HashSet::new()), 213 }) 214 } 215 216 /// Record a new follow from the firehose 217 async fn add_follower(&self, did: &str) { 218 self.non_followers.write().await.remove(did); 219 self.followers.write().await.insert(did.to_string()); 220 info!(did = %did, "Follower cache: added follower"); 221 } 222 223 /// Record an unfollow from the firehose 224 async fn remove_follower(&self, did: &str) { 225 self.followers.write().await.remove(did); 226 self.non_followers.write().await.insert(did.to_string()); 227 info!(did = %did, "Follower cache: removed follower"); 228 } 229 230 /// Check if a DID is a follower, consulting cache first then Constellation. 231 async fn is_follower( 232 &self, 233 http: &reqwest::Client, 234 bot_did: &str, 235 did: &str, 236 ) -> anyhow::Result<bool> { 237 // Check positive cache 238 if self.followers.read().await.contains(did) { 239 return Ok(true); 240 } 241 242 // Check negative cache 243 if self.non_followers.read().await.contains(did) { 244 return Ok(false); 245 } 246 247 // Not in cache — fall back to Constellation and cache the result 248 let result = is_follower_constellation(http, bot_did, did).await?; 249 if result { 250 self.followers.write().await.insert(did.to_string()); 251 } else { 252 self.non_followers.write().await.insert(did.to_string()); 253 } 254 Ok(result) 255 } 256} 257 258// Bluesky API uses serde_json::json! for record construction 259 260// --- Bluesky session --- 261 262#[derive(Debug, Deserialize)] 263struct Session { 264 #[serde(rename = "accessJwt")] 265 access_jwt: String, 266 did: String, 267} 268 269struct BskyClient { 270 http: reqwest::Client, 271 service: String, 272 access_jwt: tokio::sync::RwLock<String>, 273 did: String, 274 handle: String, 275 password: String, 276} 277 278impl BskyClient { 279 async fn login( 280 http: &reqwest::Client, 281 service: &str, 282 handle: &str, 283 password: &str, 284 ) -> anyhow::Result<Session> { 285 let resp = http 286 .post(format!("{}/xrpc/com.atproto.server.createSession", service)) 287 .json(&serde_json::json!({ 288 "identifier": handle, 289 "password": password, 290 })) 291 .send() 292 .await?; 293 294 if !resp.status().is_success() { 295 let status = resp.status(); 296 let body = resp.text().await.unwrap_or_default(); 297 anyhow::bail!("Login failed ({}): {}", status, body); 298 } 299 300 Ok(resp.json::<Session>().await?) 301 } 302 303 async fn new(service: String, handle: String, password: String) -> anyhow::Result<Arc<Self>> { 304 let http = reqwest::Client::new(); 305 let session = Self::login(&http, &service, &handle, &password).await?; 306 info!(did = %session.did, "Logged in to Bluesky"); 307 308 Ok(Arc::new(Self { 309 http, 310 service, 311 access_jwt: tokio::sync::RwLock::new(session.access_jwt), 312 did: session.did, 313 handle, 314 password, 315 })) 316 } 317 318 async fn refresh_session(&self) -> anyhow::Result<()> { 319 let session = Self::login(&self.http, &self.service, &self.handle, &self.password).await?; 320 let mut jwt = self.access_jwt.write().await; 321 *jwt = session.access_jwt; 322 info!("Refreshed Bluesky session"); 323 Ok(()) 324 } 325 326 async fn upload_blob(&self, data: &[u8], mime: &str) -> anyhow::Result<serde_json::Value> { 327 let jwt = self.access_jwt.read().await.clone(); 328 let resp = self 329 .http 330 .post(format!( 331 "{}/xrpc/com.atproto.repo.uploadBlob", 332 self.service 333 )) 334 .header("Authorization", format!("Bearer {}", jwt)) 335 .header("Content-Type", mime) 336 .body(data.to_vec()) 337 .send() 338 .await?; 339 340 if !resp.status().is_success() { 341 let status = resp.status(); 342 let body = resp.text().await.unwrap_or_default(); 343 anyhow::bail!("Upload blob failed ({}): {}", status, body); 344 } 345 346 let body: serde_json::Value = resp.json().await?; 347 Ok(body["blob"].clone()) 348 } 349 350 async fn create_record(&self, request: serde_json::Value) -> anyhow::Result<serde_json::Value> { 351 let jwt = self.access_jwt.read().await.clone(); 352 let resp = self 353 .http 354 .post(format!( 355 "{}/xrpc/com.atproto.repo.createRecord", 356 self.service 357 )) 358 .header("Authorization", format!("Bearer {}", jwt)) 359 .json(&request) 360 .send() 361 .await?; 362 363 if !resp.status().is_success() { 364 let status = resp.status(); 365 let body = resp.text().await.unwrap_or_default(); 366 anyhow::bail!("Create record failed ({}): {}", status, body); 367 } 368 369 Ok(resp.json::<serde_json::Value>().await?) 370 } 371} 372 373// --- Flickr --- 374 375async fn fetch_random_flickr_image( 376 http: &reqwest::Client, 377 flickr_key: &str, 378 search_text: &str, 379) -> anyhow::Result<Vec<u8>> { 380 // Search for photos 381 let search_url = format!( 382 "https://api.flickr.com/services/rest?api_key={}&method=flickr.photos.search&text={}&sort=interestingness-desc&content_type=0&format=json&nojsoncallback=1", 383 flickr_key, 384 urlencoding::encode(search_text) 385 ); 386 387 let resp: FlickrSearchResponse = http.get(&search_url).send().await?.json().await?; 388 389 if resp.photos.photo.is_empty() { 390 anyhow::bail!("No photos found for search: {}", search_text); 391 } 392 393 // Pick a random photo 394 let photo = resp 395 .photos 396 .photo 397 .choose(&mut rand::thread_rng()) 398 .ok_or_else(|| anyhow::anyhow!("Failed to pick random photo"))?; 399 400 // Get photo sizes 401 let sizes_url = format!( 402 "https://api.flickr.com/services/rest?api_key={}&method=flickr.photos.getSizes&format=json&nojsoncallback=1&photo_id={}", 403 flickr_key, photo.id 404 ); 405 406 let sizes_resp: FlickrSizesResponse = http.get(&sizes_url).send().await?.json().await?; 407 408 // Pick a medium-large size (similar to bash: tail -n 4 | head -n 1 picks 4th from end) 409 let sizes = &sizes_resp.sizes.size; 410 if sizes.is_empty() { 411 anyhow::bail!("No sizes available for photo {}", photo.id); 412 } 413 let idx = if sizes.len() >= 4 { 414 sizes.len() - 4 415 } else { 416 0 417 }; 418 let image_url = &sizes[idx].source; 419 420 // Download image 421 let image_bytes = http.get(image_url).send().await?.bytes().await?; 422 423 // Resize if > 1MB (Bluesky blob limit) 424 let mut data = image_bytes.to_vec(); 425 if data.len() > 1_000_000 { 426 info!( 427 size = data.len(), 428 "Image too large, resizing to fit under 1MB" 429 ); 430 data = resize_image(&data, 1_000_000)?; 431 } 432 433 Ok(data) 434} 435 436fn resize_image(data: &[u8], max_bytes: usize) -> anyhow::Result<Vec<u8>> { 437 let img = image::load_from_memory(data)?; 438 439 // Iteratively reduce quality/size 440 let mut quality = 85u8; 441 let mut current = img.clone(); 442 443 loop { 444 let mut buf = Vec::new(); 445 let mut cursor = Cursor::new(&mut buf); 446 current.write_to(&mut cursor, image::ImageFormat::Jpeg)?; 447 448 if buf.len() <= max_bytes || quality <= 10 { 449 return Ok(buf); 450 } 451 452 // Reduce quality or dimensions 453 if quality > 20 { 454 quality -= 10; 455 let mut buf2 = Vec::new(); 456 { 457 let mut encoder = 458 image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf2, quality); 459 encoder.encode_image(&current)?; 460 } 461 if buf2.len() <= max_bytes { 462 return Ok(buf2); 463 } 464 } 465 466 // Scale down by 75% 467 let new_w = (current.width() * 3) / 4; 468 let new_h = (current.height() * 3) / 4; 469 if new_w < 100 || new_h < 100 { 470 // Just return what we have at lowest quality 471 let mut buf2 = Vec::new(); 472 let mut encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf2, 10); 473 encoder.encode_image(&current)?; 474 return Ok(buf2); 475 } 476 current = current.resize(new_w, new_h, image::imageops::FilterType::Lanczos3); 477 } 478} 479 480// --- Fallback images --- 481 482fn pick_random_fallback_image(fallback_dir: &Path, folder: &str) -> anyhow::Result<Vec<u8>> { 483 let dir = fallback_dir.join(folder); 484 let entries: Vec<PathBuf> = std::fs::read_dir(&dir)? 485 .filter_map(|e| e.ok()) 486 .map(|e| e.path()) 487 .filter(|p| { 488 p.extension() 489 .and_then(|ext| ext.to_str()) 490 .map(|ext| matches!(ext.to_lowercase().as_str(), "jpg" | "jpeg" | "png" | "webp")) 491 .unwrap_or(false) 492 }) 493 .collect(); 494 495 if entries.is_empty() { 496 anyhow::bail!("No fallback images in {}", dir.display()); 497 } 498 499 let path = entries 500 .choose(&mut rand::thread_rng()) 501 .ok_or_else(|| anyhow::anyhow!("Failed to pick fallback image"))?; 502 503 let data = std::fs::read(path)?; 504 info!( 505 path = %path.display(), 506 "Using fallback image" 507 ); 508 Ok(data) 509} 510 511// --- Main logic --- 512 513async fn handle_command( 514 bsky: Arc<BskyClient>, 515 flickr_key: String, 516 fallback_dir: PathBuf, 517 post_uri: String, 518 post_cid: String, 519 command: &'static BotCommand, 520) { 521 let http = reqwest::Client::new(); 522 523 // Fetch random image from Flickr, fall back to local images on failure 524 let image_data: Vec<u8> = match fetch_random_flickr_image(&http, &flickr_key, command.flickr_search).await { 525 Ok(data) => data, 526 Err(e) => { 527 warn!( 528 command = command.trigger, 529 uri = %post_uri, 530 error = %e, 531 "Flickr fetch failed, trying fallback images" 532 ); 533 match pick_random_fallback_image(&fallback_dir, command.fallback_folder) { 534 Ok(data) => data, 535 Err(e2) => { 536 error!( 537 command = command.trigger, 538 uri = %post_uri, 539 flickr_error = %e, 540 fallback_error = %e2, 541 "Both Flickr and fallback images failed" 542 ); 543 return; 544 } 545 } 546 } 547 }; 548 549 // Upload blob to Bluesky 550 let blob = match bsky.upload_blob(&image_data, "image/jpeg").await { 551 Ok(b) => b, 552 Err(e) => { 553 // Try refreshing session and retrying once 554 warn!(error = %e, "Blob upload failed, refreshing session and retrying"); 555 if let Err(e2) = bsky.refresh_session().await { 556 error!(error = %e2, "Session refresh failed"); 557 return; 558 } 559 match bsky.upload_blob(&image_data, "image/jpeg").await { 560 Ok(b) => b, 561 Err(e2) => { 562 error!(error = %e2, "Blob upload failed after retry"); 563 return; 564 } 565 } 566 } 567 }; 568 569 // Create reply post 570 let now = chrono_now(); 571 let record = serde_json::json!({ 572 "repo": bsky.did, 573 "collection": "app.bsky.feed.post", 574 "record": { 575 "$type": "app.bsky.feed.post", 576 "text": command.reply_text, 577 "createdAt": now, 578 "reply": { 579 "root": { 580 "uri": post_uri, 581 "cid": post_cid, 582 }, 583 "parent": { 584 "uri": post_uri, 585 "cid": post_cid, 586 } 587 }, 588 "embed": { 589 "$type": "app.bsky.embed.images", 590 "images": [{ 591 "alt": command.alt_text, 592 "image": blob, 593 }] 594 } 595 } 596 }); 597 598 match bsky.create_record(record).await { 599 Ok(_) => { 600 info!( 601 command = command.trigger, 602 uri = %post_uri, 603 "{} reply sent", command.reply_text 604 ); 605 } 606 Err(e) => { 607 // Try refreshing and retrying once 608 warn!(error = %e, "Create record failed, refreshing session"); 609 if let Err(e2) = bsky.refresh_session().await { 610 error!(error = %e2, "Session refresh failed"); 611 return; 612 } 613 // Re-upload blob with new session 614 let blob2 = match bsky.upload_blob(&image_data, "image/jpeg").await { 615 Ok(b) => b, 616 Err(e2) => { 617 error!(error = %e2, "Blob re-upload failed after session refresh"); 618 return; 619 } 620 }; 621 let record2 = serde_json::json!({ 622 "repo": bsky.did, 623 "collection": "app.bsky.feed.post", 624 "record": { 625 "$type": "app.bsky.feed.post", 626 "text": command.reply_text, 627 "createdAt": chrono_now(), 628 "reply": { 629 "root": { 630 "uri": post_uri, 631 "cid": post_cid, 632 }, 633 "parent": { 634 "uri": post_uri, 635 "cid": post_cid, 636 } 637 }, 638 "embed": { 639 "$type": "app.bsky.embed.images", 640 "images": [{ 641 "alt": command.alt_text, 642 "image": blob2, 643 }] 644 } 645 } 646 }); 647 match bsky.create_record(record2).await { 648 Ok(_) => { 649 info!( 650 command = command.trigger, 651 uri = %post_uri, 652 "{} reply sent (after retry)", command.reply_text 653 ); 654 } 655 Err(e2) => { 656 error!( 657 command = command.trigger, 658 uri = %post_uri, 659 error = %e2, 660 "Create record failed after retry" 661 ); 662 } 663 } 664 } 665 } 666} 667 668fn chrono_now() -> String { 669 use std::time::SystemTime; 670 let d = SystemTime::now() 671 .duration_since(SystemTime::UNIX_EPOCH) 672 .unwrap(); 673 let secs = d.as_secs(); 674 let millis = d.subsec_millis(); 675 // Format as ISO 8601 676 let days_since_epoch = secs / 86400; 677 let time_of_day = secs % 86400; 678 let hours = time_of_day / 3600; 679 let minutes = (time_of_day % 3600) / 60; 680 let seconds = time_of_day % 60; 681 682 // Simple date calculation 683 let mut y = 1970i64; 684 let mut remaining = days_since_epoch as i64; 685 loop { 686 let days_in_year = if is_leap(y) { 366 } else { 365 }; 687 if remaining < days_in_year { 688 break; 689 } 690 remaining -= days_in_year; 691 y += 1; 692 } 693 let month_days = if is_leap(y) { 694 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] 695 } else { 696 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] 697 }; 698 let mut m = 0; 699 for (i, &days) in month_days.iter().enumerate() { 700 if remaining < days { 701 m = i; 702 break; 703 } 704 remaining -= days; 705 } 706 707 format!( 708 "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z", 709 y, 710 m + 1, 711 remaining + 1, 712 hours, 713 minutes, 714 seconds, 715 millis 716 ) 717} 718 719fn is_leap(y: i64) -> bool { 720 (y % 4 == 0 && y % 100 != 0) || y % 400 == 0 721} 722 723fn find_matching_command(text: &str) -> Option<&'static BotCommand> { 724 let lower = text.to_lowercase(); 725 COMMANDS.iter().find(|cmd| lower.contains(cmd.trigger)) 726} 727 728async fn connect_and_stream( 729 bsky: Arc<BskyClient>, 730 flickr_key: String, 731 fallback_dir: PathBuf, 732 follows_only: bool, 733 semaphore: Arc<Semaphore>, 734 follower_cache: Arc<FollowerCache>, 735) -> anyhow::Result<()> { 736 let jetstream_url = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.graph.follow"; 737 738 info!("Connecting to Jetstream..."); 739 let (ws_stream, _) = tokio_tungstenite::connect_async(jetstream_url).await?; 740 info!("Connected to Jetstream"); 741 742 let (_write, mut read) = ws_stream.split(); 743 744 let mut last_cursor: u64 = 0; 745 746 while let Some(msg) = read.next().await { 747 let msg = match msg { 748 Ok(m) => m, 749 Err(e) => { 750 error!(error = %e, "WebSocket read error"); 751 return Err(e.into()); 752 } 753 }; 754 755 let text = match msg { 756 Message::Text(t) => t, 757 Message::Ping(_) => continue, 758 Message::Pong(_) => continue, 759 Message::Close(_) => { 760 warn!("WebSocket closed by server"); 761 return Ok(()); 762 } 763 _ => continue, 764 }; 765 766 // Parse event 767 let event: JetstreamEvent = match serde_json::from_str(&text) { 768 Ok(e) => e, 769 Err(_) => continue, 770 }; 771 772 // Track cursor for reconnection 773 if let Some(t) = event.time_us { 774 last_cursor = t; 775 } 776 777 // Only care about commit events with create operation on posts 778 let commit = match &event.commit { 779 Some(c) => c, 780 None => continue, 781 }; 782 783 // Handle follow/unfollow events for our follower cache 784 if commit.collection.as_deref() == Some("app.bsky.graph.follow") { 785 if let Some(did) = &event.did { 786 // Check if this follow targets our bot 787 let is_about_us = commit 788 .record 789 .as_ref() 790 .and_then(|r| r.get("subject")) 791 .and_then(|s| s.as_str()) 792 == Some(&bsky.did); 793 794 if is_about_us { 795 match commit.operation.as_deref() { 796 Some("create") => { 797 follower_cache.add_follower(did).await; 798 } 799 Some("delete") => { 800 follower_cache.remove_follower(did).await; 801 } 802 _ => {} 803 } 804 } 805 } 806 continue; 807 } 808 809 if commit.operation.as_deref() != Some("create") { 810 continue; 811 } 812 if commit.collection.as_deref() != Some("app.bsky.feed.post") { 813 continue; 814 } 815 816 // Check the post text for a trigger 817 let record = match &commit.record { 818 Some(r) => r, 819 None => continue, 820 }; 821 822 let post_text = match record.get("text").and_then(|t| t.as_str()) { 823 Some(t) => t, 824 None => continue, 825 }; 826 827 let command = match find_matching_command(post_text) { 828 Some(c) => c, 829 None => continue, 830 }; 831 832 let did = match &event.did { 833 Some(d) => d.clone(), 834 None => continue, 835 }; 836 let rkey = match &commit.rkey { 837 Some(r) => r.clone(), 838 None => continue, 839 }; 840 let cid = match &commit.cid { 841 Some(c) => c.clone(), 842 None => continue, 843 }; 844 845 let post_uri = format!("at://{}/app.bsky.feed.post/{}", did, rkey); 846 847 // Check if the poster follows the bot (if FOLLOWS_ONLY is enabled) 848 if follows_only { 849 match follower_cache.is_follower(&bsky.http, &bsky.did, &did).await { 850 Ok(true) => {} 851 Ok(false) => { 852 info!( 853 command = command.trigger, 854 poster = %did, 855 "Ignoring command from non-follower" 856 ); 857 continue; 858 } 859 Err(e) => { 860 warn!( 861 command = command.trigger, 862 poster = %did, 863 error = %e, 864 "Follower check failed, skipping" 865 ); 866 continue; 867 } 868 } 869 } 870 871 info!( 872 command = command.trigger, 873 uri = %post_uri, 874 "Matched command, queuing reply" 875 ); 876 877 // Spawn bounded task 878 let bsky = bsky.clone(); 879 let flickr_key = flickr_key.clone(); 880 let fallback_dir = fallback_dir.clone(); 881 let permit = semaphore.clone().acquire_owned().await?; 882 883 tokio::spawn(async move { 884 handle_command(bsky, flickr_key, fallback_dir, post_uri, cid, command).await; 885 drop(permit); 886 }); 887 } 888 889 warn!(cursor = last_cursor, "Jetstream stream ended"); 890 Ok(()) 891} 892 893#[tokio::main] 894async fn main() -> anyhow::Result<()> { 895 // Initialize logging 896 tracing_subscriber::fmt() 897 .with_env_filter( 898 tracing_subscriber::EnvFilter::try_from_default_env() 899 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), 900 ) 901 .init(); 902 903 // Load .env 904 dotenvy::dotenv().ok(); 905 906 let bsky_handle = 907 std::env::var("BSKY_HANDLE").expect("BSKY_HANDLE env var required"); 908 let bsky_password = 909 std::env::var("BSKY_PASSWORD").expect("BSKY_PASSWORD env var required"); 910 let flickr_key = 911 std::env::var("FLICKR_KEY").expect("FLICKR_KEY env var required"); 912 let bsky_service = std::env::var("BSKY_SERVICE") 913 .unwrap_or_else(|_| "https://bsky.social".to_string()); 914 let follows_only = std::env::var("FOLLOWS_ONLY") 915 .map(|v| matches!(v.to_lowercase().as_str(), "1" | "true" | "yes")) 916 .unwrap_or(false); 917 918 // Resolve fallback image directory (default: ./fallback_images) 919 let fallback_dir = PathBuf::from( 920 std::env::var("FALLBACK_IMAGE_DIR").unwrap_or_else(|_| "fallback_images".to_string()), 921 ); 922 let fallback_dir = std::fs::canonicalize(&fallback_dir).unwrap_or(fallback_dir); 923 924 info!( 925 handle = %bsky_handle, 926 service = %bsky_service, 927 fallback_dir = %fallback_dir.display(), 928 follows_only = follows_only, 929 commands = COMMANDS.len(), 930 "Starting honkbot" 931 ); 932 933 // Login to Bluesky 934 let bsky: Arc<BskyClient> = BskyClient::new(bsky_service, bsky_handle, bsky_password).await?; 935 936 // Limit concurrent reply tasks (don't flood APIs) 937 let semaphore = Arc::new(Semaphore::new(10)); 938 939 // Follower cache — updated in real-time via firehose follow events 940 let follower_cache = FollowerCache::new(); 941 942 // Main loop with automatic reconnection 943 loop { 944 match connect_and_stream(bsky.clone(), flickr_key.clone(), fallback_dir.clone(), follows_only, semaphore.clone(), follower_cache.clone()).await { 945 Ok(()) => { 946 warn!("Stream ended normally, reconnecting in 5s..."); 947 } 948 Err(e) => { 949 error!(error = %e, "Stream error, reconnecting in 5s..."); 950 } 951 } 952 953 // Refresh session before reconnecting 954 if let Err(e) = bsky.refresh_session().await { 955 error!(error = %e, "Failed to refresh session before reconnect"); 956 } 957 958 tokio::time::sleep(Duration::from_secs(5)).await; 959 } 960}