use anyhow::Result; use base64::Engine; use sqlx::SqlitePool; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Semaphore, broadcast}; use tokio::task::JoinSet; use tracing::{info, warn}; use url::Url; use crate::db; fn resolve_url(base: &str, url: &str) -> Option { Url::parse(url) .or_else(|_| Url::parse(base).and_then(|b| b.join(url))) .ok() .map(|u| u.to_string()) } /// Run the periodic fetch loop, also listening for manual refresh triggers. pub async fn run( pool: SqlitePool, interval_minutes: u64, mut trigger_rx: broadcast::Receiver>, ) { let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60)); loop { tokio::select! { _ = interval.tick() => { info!("starting periodic feed fetch cycle"); if let Err(e) = fetch_all(&pool).await { warn!("periodic feed fetch cycle failed: {e:#}"); } } Ok(trigger) = trigger_rx.recv() => { match trigger { Some(feed_id) => { info!(feed_id, "manual refresh triggered for single feed"); if let Err(e) = fetch_single(&pool, feed_id).await { warn!(feed_id, "manual feed refresh failed: {e:#}"); } } None => { info!("manual refresh triggered for all feeds"); if let Err(e) = fetch_all(&pool).await { warn!("manual feed fetch cycle failed: {e:#}"); } } } } } } } async fn fetch_single(pool: &SqlitePool, feed_id: i64) -> Result<()> { let feed = db::get_single_feed_for_fetch(pool, feed_id) .await? .ok_or_else(|| anyhow::anyhow!("feed {feed_id} not found"))?; let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .build()?; fetch_feed(pool, &client, &feed).await } async fn fetch_all(pool: &SqlitePool) -> Result<()> { let feeds = db::get_all_feeds(pool).await?; let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .build()?; let max_concurrent: usize = std::env::var("SLURP_FETCH_CONCURRENCY") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(5); let semaphore = (max_concurrent != 0).then(|| Arc::new(Semaphore::new(max_concurrent))); let mut join_set = JoinSet::new(); for feed in feeds { let pool = pool.clone(); let client = client.clone(); let semaphore = semaphore.clone(); let permit = if let Some(sem) = semaphore { Some(sem.acquire_owned().await.expect("semaphore closed")) } else { None }; join_set.spawn(async move { let _permit = permit; if let Err(e) = fetch_feed(&pool, &client, &feed).await { warn!(feed_id = feed.id, url = %feed.url, "failed to fetch: {e:#}"); } }); } while let Some(result) = join_set.join_next().await { if let Err(e) = result { warn!("fetch task failed to join: {e}"); } } Ok(()) } async fn fetch_feed( pool: &SqlitePool, client: &reqwest::Client, feed: &db::FetcherFeed, ) -> Result<()> { info!(feed_id = feed.id, url = %feed.url, "fetching"); let response = client.get(&feed.url).send().await?.bytes().await?; let parsed = feed_rs::parser::parse(&response[..])?; let title = parsed.title.map(|t| t.content).unwrap_or_default(); let site_url = parsed .links .iter() .find(|l| l.rel.as_deref() == Some("alternate")) .or_else(|| parsed.links.first()) .map(|l| l.href.clone()) .unwrap_or_default(); db::upsert_feed_metadata(pool, feed.id, &title, &site_url).await?; // fetch favicon if missing if feed.favicon_id.is_none() { let icon_url = parsed .icon .map(|i| i.uri) .or_else(|| parsed.logo.map(|l| l.uri)) .or_else(|| { if site_url.is_empty() { None } else { let base = site_url.trim_end_matches('/'); Some(format!("{base}/favicon.ico")) } }); if let Some(url) = icon_url && let Some(resolved) = resolve_url(&feed.url, &url) { match fetch_favicon(client, &resolved).await { Ok(data) => { let favicon_id = db::insert_favicon(pool, &data).await?; db::set_feed_favicon(pool, feed.id, favicon_id).await?; } Err(e) => warn!(feed_id = feed.id, "favicon fetch failed: {e:#}"), } } } for entry in parsed.entries { let guid = entry.id; let entry_title = entry.title.map(|t| t.content).unwrap_or_default(); let author = entry .authors .first() .map(|a| a.name.clone()) .unwrap_or_default(); let url = entry .links .first() .map(|l| l.href.clone()) .unwrap_or_default(); let content = entry .content .and_then(|c| c.body) .or_else(|| entry.summary.map(|s| s.content)) .unwrap_or_default(); let published_at = entry.published.or(entry.updated).map(|dt| dt.timestamp()); db::insert_item( pool, feed.id, &guid, &entry_title, &author, &url, &content, published_at, ) .await?; } info!(feed_id = feed.id, "done"); Ok(()) } async fn fetch_favicon(client: &reqwest::Client, url: &str) -> Result { let response = client.get(url).send().await?; let content_type = response .headers() .get("content-type") .and_then(|v| v.to_str().ok()) .unwrap_or("image/x-icon") .to_string(); let mime = content_type .split(';') .next() .unwrap_or("image/x-icon") .trim(); let bytes = response.bytes().await?; let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes); Ok(format!("{mime};base64,{encoded}")) }