use futures_util::StreamExt; use rand::seq::SliceRandom; use serde::Deserialize; use std::collections::HashSet; use std::io::Cursor; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{RwLock, Semaphore}; use tokio_tungstenite::tungstenite::Message; use tracing::{error, info, warn}; // --- Command configuration --- struct BotCommand { trigger: &'static str, // e.g. "/honk" reply_text: &'static str, flickr_search: &'static str, alt_text: &'static str, fallback_folder: &'static str, // e.g. "honk" } const COMMANDS: &[BotCommand] = &[ BotCommand { trigger: "/honk", reply_text: "HONK", flickr_search: "goose geese bird", alt_text: "A goose", fallback_folder: "honk", }, BotCommand { trigger: "/awoo", reply_text: "AWOO", flickr_search: "pup wolves wolf", alt_text: "A wolf", fallback_folder: "awoo", }, BotCommand { trigger: "/baaa", reply_text: "BAAA", flickr_search: "kid goats goat", alt_text: "A goat", fallback_folder: "baaa", }, BotCommand { trigger: "/bork", reply_text: "BORK", flickr_search: "puppy puppies dog", alt_text: "A dog", fallback_folder: "bork", }, BotCommand { trigger: "/hiss", reply_text: "HISS", flickr_search: "snake reptile hiss", alt_text: "A snake", fallback_folder: "hiss", }, BotCommand { trigger: "/hoot", reply_text: "HOOT", flickr_search: "owlet owls owl", alt_text: "An owl", fallback_folder: "hoot", }, BotCommand { trigger: "/meehh", reply_text: "MEEHH", flickr_search: "lamb sheep lambs", alt_text: "A lamb", fallback_folder: "meehh", }, BotCommand { trigger: "/meow", reply_text: "MEOW", flickr_search: "kitty kitties cat", alt_text: "A cat", fallback_folder: "meow", }, BotCommand { trigger: "/mumble", reply_text: "MUMBLE", flickr_search: "marmot marmots rodent nature", alt_text: "A marmot", fallback_folder: "mumble", }, BotCommand { trigger: "/oink", reply_text: "OINK", flickr_search: "pig piggy oink farm", alt_text: "A pig", fallback_folder: "oink", }, BotCommand { trigger: "/ribbit", reply_text: "RIBBIT", flickr_search: "frog frogs amphibian nature", alt_text: "A frog", fallback_folder: "ribbit", }, BotCommand { trigger: "/squee", reply_text: "SQUEE", flickr_search: "capybaras rodent capybaras nature", alt_text: "A capybara", fallback_folder: "squee", }, BotCommand { trigger: "/yowl", reply_text: "YOWL", flickr_search: "lynx bobcats bobcat", alt_text: "A bobcat", fallback_folder: "yowl", }, ]; // --- Jetstream types --- #[derive(Debug, Deserialize)] struct JetstreamEvent { did: Option, time_us: Option, #[allow(dead_code)] kind: Option, commit: Option, } #[derive(Debug, Deserialize)] struct JetstreamCommit { #[allow(dead_code)] rev: Option, operation: Option, collection: Option, rkey: Option, record: Option, cid: Option, } // --- Flickr types --- #[derive(Debug, Deserialize)] struct FlickrPhoto { id: String, } #[derive(Debug, Deserialize)] struct FlickrPhotosResult { photo: Vec, } #[derive(Debug, Deserialize)] struct FlickrSearchResponse { photos: FlickrPhotosResult, } #[derive(Debug, Deserialize)] struct FlickrSize { source: String, #[allow(dead_code)] label: Option, } #[derive(Debug, Deserialize)] struct FlickrSizes { size: Vec, } #[derive(Debug, Deserialize)] struct FlickrSizesResponse { sizes: FlickrSizes, } // --- Constellation (follower check) --- #[derive(Debug, Deserialize)] struct ConstellationBacklinks { total: u64, } /// Check if `follower_did` follows the bot (`bot_did`) using Constellation's getBacklinks. /// Returns true if a follow record exists, false otherwise. async fn is_follower_constellation( http: &reqwest::Client, bot_did: &str, follower_did: &str, ) -> anyhow::Result { let url = format!( "https://constellation.microcosm.blue/xrpc/blue.microcosm.links.getBacklinks?subject={}&source=app.bsky.graph.follow:subject&did={}&limit=1", urlencoding::encode(bot_did), urlencoding::encode(follower_did), ); let resp: ConstellationBacklinks = http.get(&url).send().await?.json().await?; Ok(resp.total > 0) } // --- Follower cache --- /// In-memory cache of follower DIDs, updated in real-time via the firehose. /// Falls back to Constellation API for DIDs not yet seen in the firehose. struct FollowerCache { /// DIDs known to be followers followers: RwLock>, /// DIDs known to NOT be followers (to avoid repeated Constellation lookups) non_followers: RwLock>, } impl FollowerCache { fn new() -> Arc { Arc::new(Self { followers: RwLock::new(HashSet::new()), non_followers: RwLock::new(HashSet::new()), }) } /// Record a new follow from the firehose async fn add_follower(&self, did: &str) { self.non_followers.write().await.remove(did); self.followers.write().await.insert(did.to_string()); info!(did = %did, "Follower cache: added follower"); } /// Record an unfollow from the firehose async fn remove_follower(&self, did: &str) { self.followers.write().await.remove(did); self.non_followers.write().await.insert(did.to_string()); info!(did = %did, "Follower cache: removed follower"); } /// Check if a DID is a follower, consulting cache first then Constellation. async fn is_follower( &self, http: &reqwest::Client, bot_did: &str, did: &str, ) -> anyhow::Result { // Check positive cache if self.followers.read().await.contains(did) { return Ok(true); } // Check negative cache if self.non_followers.read().await.contains(did) { return Ok(false); } // Not in cache — fall back to Constellation and cache the result let result = is_follower_constellation(http, bot_did, did).await?; if result { self.followers.write().await.insert(did.to_string()); } else { self.non_followers.write().await.insert(did.to_string()); } Ok(result) } } // Bluesky API uses serde_json::json! for record construction // --- Bluesky session --- #[derive(Debug, Deserialize)] struct Session { #[serde(rename = "accessJwt")] access_jwt: String, did: String, } struct BskyClient { http: reqwest::Client, service: String, access_jwt: tokio::sync::RwLock, did: String, handle: String, password: String, } impl BskyClient { async fn login( http: &reqwest::Client, service: &str, handle: &str, password: &str, ) -> anyhow::Result { let resp = http .post(format!("{}/xrpc/com.atproto.server.createSession", service)) .json(&serde_json::json!({ "identifier": handle, "password": password, })) .send() .await?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); anyhow::bail!("Login failed ({}): {}", status, body); } Ok(resp.json::().await?) } async fn new(service: String, handle: String, password: String) -> anyhow::Result> { let http = reqwest::Client::new(); let session = Self::login(&http, &service, &handle, &password).await?; info!(did = %session.did, "Logged in to Bluesky"); Ok(Arc::new(Self { http, service, access_jwt: tokio::sync::RwLock::new(session.access_jwt), did: session.did, handle, password, })) } async fn refresh_session(&self) -> anyhow::Result<()> { let session = Self::login(&self.http, &self.service, &self.handle, &self.password).await?; let mut jwt = self.access_jwt.write().await; *jwt = session.access_jwt; info!("Refreshed Bluesky session"); Ok(()) } async fn upload_blob(&self, data: &[u8], mime: &str) -> anyhow::Result { let jwt = self.access_jwt.read().await.clone(); let resp = self .http .post(format!( "{}/xrpc/com.atproto.repo.uploadBlob", self.service )) .header("Authorization", format!("Bearer {}", jwt)) .header("Content-Type", mime) .body(data.to_vec()) .send() .await?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); anyhow::bail!("Upload blob failed ({}): {}", status, body); } let body: serde_json::Value = resp.json().await?; Ok(body["blob"].clone()) } async fn create_record(&self, request: serde_json::Value) -> anyhow::Result { let jwt = self.access_jwt.read().await.clone(); let resp = self .http .post(format!( "{}/xrpc/com.atproto.repo.createRecord", self.service )) .header("Authorization", format!("Bearer {}", jwt)) .json(&request) .send() .await?; if !resp.status().is_success() { let status = resp.status(); let body = resp.text().await.unwrap_or_default(); anyhow::bail!("Create record failed ({}): {}", status, body); } Ok(resp.json::().await?) } } // --- Flickr --- async fn fetch_random_flickr_image( http: &reqwest::Client, flickr_key: &str, search_text: &str, ) -> anyhow::Result> { // Search for photos let search_url = format!( "https://api.flickr.com/services/rest?api_key={}&method=flickr.photos.search&text={}&sort=interestingness-desc&content_type=0&format=json&nojsoncallback=1", flickr_key, urlencoding::encode(search_text) ); let resp: FlickrSearchResponse = http.get(&search_url).send().await?.json().await?; if resp.photos.photo.is_empty() { anyhow::bail!("No photos found for search: {}", search_text); } // Pick a random photo let photo = resp .photos .photo .choose(&mut rand::thread_rng()) .ok_or_else(|| anyhow::anyhow!("Failed to pick random photo"))?; // Get photo sizes let sizes_url = format!( "https://api.flickr.com/services/rest?api_key={}&method=flickr.photos.getSizes&format=json&nojsoncallback=1&photo_id={}", flickr_key, photo.id ); let sizes_resp: FlickrSizesResponse = http.get(&sizes_url).send().await?.json().await?; // Pick a medium-large size (similar to bash: tail -n 4 | head -n 1 picks 4th from end) let sizes = &sizes_resp.sizes.size; if sizes.is_empty() { anyhow::bail!("No sizes available for photo {}", photo.id); } let idx = if sizes.len() >= 4 { sizes.len() - 4 } else { 0 }; let image_url = &sizes[idx].source; // Download image let image_bytes = http.get(image_url).send().await?.bytes().await?; // Resize if > 1MB (Bluesky blob limit) let mut data = image_bytes.to_vec(); if data.len() > 1_000_000 { info!( size = data.len(), "Image too large, resizing to fit under 1MB" ); data = resize_image(&data, 1_000_000)?; } Ok(data) } fn resize_image(data: &[u8], max_bytes: usize) -> anyhow::Result> { let img = image::load_from_memory(data)?; // Iteratively reduce quality/size let mut quality = 85u8; let mut current = img.clone(); loop { let mut buf = Vec::new(); let mut cursor = Cursor::new(&mut buf); current.write_to(&mut cursor, image::ImageFormat::Jpeg)?; if buf.len() <= max_bytes || quality <= 10 { return Ok(buf); } // Reduce quality or dimensions if quality > 20 { quality -= 10; let mut buf2 = Vec::new(); { let mut encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf2, quality); encoder.encode_image(¤t)?; } if buf2.len() <= max_bytes { return Ok(buf2); } } // Scale down by 75% let new_w = (current.width() * 3) / 4; let new_h = (current.height() * 3) / 4; if new_w < 100 || new_h < 100 { // Just return what we have at lowest quality let mut buf2 = Vec::new(); let mut encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf2, 10); encoder.encode_image(¤t)?; return Ok(buf2); } current = current.resize(new_w, new_h, image::imageops::FilterType::Lanczos3); } } // --- Fallback images --- fn pick_random_fallback_image(fallback_dir: &Path, folder: &str) -> anyhow::Result> { let dir = fallback_dir.join(folder); let entries: Vec = std::fs::read_dir(&dir)? .filter_map(|e| e.ok()) .map(|e| e.path()) .filter(|p| { p.extension() .and_then(|ext| ext.to_str()) .map(|ext| matches!(ext.to_lowercase().as_str(), "jpg" | "jpeg" | "png" | "webp")) .unwrap_or(false) }) .collect(); if entries.is_empty() { anyhow::bail!("No fallback images in {}", dir.display()); } let path = entries .choose(&mut rand::thread_rng()) .ok_or_else(|| anyhow::anyhow!("Failed to pick fallback image"))?; let data = std::fs::read(path)?; info!( path = %path.display(), "Using fallback image" ); Ok(data) } // --- Main logic --- async fn handle_command( bsky: Arc, flickr_key: String, fallback_dir: PathBuf, post_uri: String, post_cid: String, command: &'static BotCommand, ) { let http = reqwest::Client::new(); // Fetch random image from Flickr, fall back to local images on failure let image_data: Vec = match fetch_random_flickr_image(&http, &flickr_key, command.flickr_search).await { Ok(data) => data, Err(e) => { warn!( command = command.trigger, uri = %post_uri, error = %e, "Flickr fetch failed, trying fallback images" ); match pick_random_fallback_image(&fallback_dir, command.fallback_folder) { Ok(data) => data, Err(e2) => { error!( command = command.trigger, uri = %post_uri, flickr_error = %e, fallback_error = %e2, "Both Flickr and fallback images failed" ); return; } } } }; // Upload blob to Bluesky let blob = match bsky.upload_blob(&image_data, "image/jpeg").await { Ok(b) => b, Err(e) => { // Try refreshing session and retrying once warn!(error = %e, "Blob upload failed, refreshing session and retrying"); if let Err(e2) = bsky.refresh_session().await { error!(error = %e2, "Session refresh failed"); return; } match bsky.upload_blob(&image_data, "image/jpeg").await { Ok(b) => b, Err(e2) => { error!(error = %e2, "Blob upload failed after retry"); return; } } } }; // Create reply post let now = chrono_now(); let record = serde_json::json!({ "repo": bsky.did, "collection": "app.bsky.feed.post", "record": { "$type": "app.bsky.feed.post", "text": command.reply_text, "createdAt": now, "reply": { "root": { "uri": post_uri, "cid": post_cid, }, "parent": { "uri": post_uri, "cid": post_cid, } }, "embed": { "$type": "app.bsky.embed.images", "images": [{ "alt": command.alt_text, "image": blob, }] } } }); match bsky.create_record(record).await { Ok(_) => { info!( command = command.trigger, uri = %post_uri, "{} reply sent", command.reply_text ); } Err(e) => { // Try refreshing and retrying once warn!(error = %e, "Create record failed, refreshing session"); if let Err(e2) = bsky.refresh_session().await { error!(error = %e2, "Session refresh failed"); return; } // Re-upload blob with new session let blob2 = match bsky.upload_blob(&image_data, "image/jpeg").await { Ok(b) => b, Err(e2) => { error!(error = %e2, "Blob re-upload failed after session refresh"); return; } }; let record2 = serde_json::json!({ "repo": bsky.did, "collection": "app.bsky.feed.post", "record": { "$type": "app.bsky.feed.post", "text": command.reply_text, "createdAt": chrono_now(), "reply": { "root": { "uri": post_uri, "cid": post_cid, }, "parent": { "uri": post_uri, "cid": post_cid, } }, "embed": { "$type": "app.bsky.embed.images", "images": [{ "alt": command.alt_text, "image": blob2, }] } } }); match bsky.create_record(record2).await { Ok(_) => { info!( command = command.trigger, uri = %post_uri, "{} reply sent (after retry)", command.reply_text ); } Err(e2) => { error!( command = command.trigger, uri = %post_uri, error = %e2, "Create record failed after retry" ); } } } } } fn chrono_now() -> String { use std::time::SystemTime; let d = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); let secs = d.as_secs(); let millis = d.subsec_millis(); // Format as ISO 8601 let days_since_epoch = secs / 86400; let time_of_day = secs % 86400; let hours = time_of_day / 3600; let minutes = (time_of_day % 3600) / 60; let seconds = time_of_day % 60; // Simple date calculation let mut y = 1970i64; let mut remaining = days_since_epoch as i64; loop { let days_in_year = if is_leap(y) { 366 } else { 365 }; if remaining < days_in_year { break; } remaining -= days_in_year; y += 1; } let month_days = if is_leap(y) { [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] } else { [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] }; let mut m = 0; for (i, &days) in month_days.iter().enumerate() { if remaining < days { m = i; break; } remaining -= days; } format!( "{:04}-{:02}-{:02}T{:02}:{:02}:{:02}.{:03}Z", y, m + 1, remaining + 1, hours, minutes, seconds, millis ) } fn is_leap(y: i64) -> bool { (y % 4 == 0 && y % 100 != 0) || y % 400 == 0 } fn find_matching_command(text: &str) -> Option<&'static BotCommand> { let lower = text.to_lowercase(); COMMANDS.iter().find(|cmd| lower.contains(cmd.trigger)) } async fn connect_and_stream( bsky: Arc, flickr_key: String, fallback_dir: PathBuf, follows_only: bool, semaphore: Arc, follower_cache: Arc, ) -> anyhow::Result<()> { let jetstream_url = "wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.graph.follow"; info!("Connecting to Jetstream..."); let (ws_stream, _) = tokio_tungstenite::connect_async(jetstream_url).await?; info!("Connected to Jetstream"); let (_write, mut read) = ws_stream.split(); let mut last_cursor: u64 = 0; while let Some(msg) = read.next().await { let msg = match msg { Ok(m) => m, Err(e) => { error!(error = %e, "WebSocket read error"); return Err(e.into()); } }; let text = match msg { Message::Text(t) => t, Message::Ping(_) => continue, Message::Pong(_) => continue, Message::Close(_) => { warn!("WebSocket closed by server"); return Ok(()); } _ => continue, }; // Parse event let event: JetstreamEvent = match serde_json::from_str(&text) { Ok(e) => e, Err(_) => continue, }; // Track cursor for reconnection if let Some(t) = event.time_us { last_cursor = t; } // Only care about commit events with create operation on posts let commit = match &event.commit { Some(c) => c, None => continue, }; // Handle follow/unfollow events for our follower cache if commit.collection.as_deref() == Some("app.bsky.graph.follow") { if let Some(did) = &event.did { // Check if this follow targets our bot let is_about_us = commit .record .as_ref() .and_then(|r| r.get("subject")) .and_then(|s| s.as_str()) == Some(&bsky.did); if is_about_us { match commit.operation.as_deref() { Some("create") => { follower_cache.add_follower(did).await; } Some("delete") => { follower_cache.remove_follower(did).await; } _ => {} } } } continue; } if commit.operation.as_deref() != Some("create") { continue; } if commit.collection.as_deref() != Some("app.bsky.feed.post") { continue; } // Check the post text for a trigger let record = match &commit.record { Some(r) => r, None => continue, }; let post_text = match record.get("text").and_then(|t| t.as_str()) { Some(t) => t, None => continue, }; let command = match find_matching_command(post_text) { Some(c) => c, None => continue, }; let did = match &event.did { Some(d) => d.clone(), None => continue, }; let rkey = match &commit.rkey { Some(r) => r.clone(), None => continue, }; let cid = match &commit.cid { Some(c) => c.clone(), None => continue, }; let post_uri = format!("at://{}/app.bsky.feed.post/{}", did, rkey); // Check if the poster follows the bot (if FOLLOWS_ONLY is enabled) if follows_only { match follower_cache.is_follower(&bsky.http, &bsky.did, &did).await { Ok(true) => {} Ok(false) => { info!( command = command.trigger, poster = %did, "Ignoring command from non-follower" ); continue; } Err(e) => { warn!( command = command.trigger, poster = %did, error = %e, "Follower check failed, skipping" ); continue; } } } info!( command = command.trigger, uri = %post_uri, "Matched command, queuing reply" ); // Spawn bounded task let bsky = bsky.clone(); let flickr_key = flickr_key.clone(); let fallback_dir = fallback_dir.clone(); let permit = semaphore.clone().acquire_owned().await?; tokio::spawn(async move { handle_command(bsky, flickr_key, fallback_dir, post_uri, cid, command).await; drop(permit); }); } warn!(cursor = last_cursor, "Jetstream stream ended"); Ok(()) } #[tokio::main] async fn main() -> anyhow::Result<()> { // Initialize logging tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .init(); // Load .env dotenvy::dotenv().ok(); let bsky_handle = std::env::var("BSKY_HANDLE").expect("BSKY_HANDLE env var required"); let bsky_password = std::env::var("BSKY_PASSWORD").expect("BSKY_PASSWORD env var required"); let flickr_key = std::env::var("FLICKR_KEY").expect("FLICKR_KEY env var required"); let bsky_service = std::env::var("BSKY_SERVICE") .unwrap_or_else(|_| "https://bsky.social".to_string()); let follows_only = std::env::var("FOLLOWS_ONLY") .map(|v| matches!(v.to_lowercase().as_str(), "1" | "true" | "yes")) .unwrap_or(false); // Resolve fallback image directory (default: ./fallback_images) let fallback_dir = PathBuf::from( std::env::var("FALLBACK_IMAGE_DIR").unwrap_or_else(|_| "fallback_images".to_string()), ); let fallback_dir = std::fs::canonicalize(&fallback_dir).unwrap_or(fallback_dir); info!( handle = %bsky_handle, service = %bsky_service, fallback_dir = %fallback_dir.display(), follows_only = follows_only, commands = COMMANDS.len(), "Starting honkbot" ); // Login to Bluesky let bsky: Arc = BskyClient::new(bsky_service, bsky_handle, bsky_password).await?; // Limit concurrent reply tasks (don't flood APIs) let semaphore = Arc::new(Semaphore::new(10)); // Follower cache — updated in real-time via firehose follow events let follower_cache = FollowerCache::new(); // Main loop with automatic reconnection loop { match connect_and_stream(bsky.clone(), flickr_key.clone(), fallback_dir.clone(), follows_only, semaphore.clone(), follower_cache.clone()).await { Ok(()) => { warn!("Stream ended normally, reconnecting in 5s..."); } Err(e) => { error!(error = %e, "Stream error, reconnecting in 5s..."); } } // Refresh session before reconnecting if let Err(e) = bsky.refresh_session().await { error!(error = %e, "Failed to refresh session before reconnect"); } tokio::time::sleep(Duration::from_secs(5)).await; } }