Homebrew RSS reader server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 209 lines 6.5 kB view raw
1use anyhow::Result; 2use base64::Engine; 3use sqlx::SqlitePool; 4use std::sync::Arc; 5use std::time::Duration; 6use tokio::sync::{Semaphore, broadcast}; 7use tokio::task::JoinSet; 8use tracing::{info, warn}; 9use url::Url; 10 11use crate::db; 12 13fn resolve_url(base: &str, url: &str) -> Option<String> { 14 Url::parse(url) 15 .or_else(|_| Url::parse(base).and_then(|b| b.join(url))) 16 .ok() 17 .map(|u| u.to_string()) 18} 19 20/// Run the periodic fetch loop, also listening for manual refresh triggers. 21pub async fn run( 22 pool: SqlitePool, 23 interval_minutes: u64, 24 mut trigger_rx: broadcast::Receiver<Option<i64>>, 25) { 26 let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60)); 27 28 loop { 29 tokio::select! { 30 _ = interval.tick() => { 31 info!("starting periodic feed fetch cycle"); 32 if let Err(e) = fetch_all(&pool).await { 33 warn!("periodic feed fetch cycle failed: {e:#}"); 34 } 35 } 36 Ok(trigger) = trigger_rx.recv() => { 37 match trigger { 38 Some(feed_id) => { 39 info!(feed_id, "manual refresh triggered for single feed"); 40 if let Err(e) = fetch_single(&pool, feed_id).await { 41 warn!(feed_id, "manual feed refresh failed: {e:#}"); 42 } 43 } 44 None => { 45 info!("manual refresh triggered for all feeds"); 46 if let Err(e) = fetch_all(&pool).await { 47 warn!("manual feed fetch cycle failed: {e:#}"); 48 } 49 } 50 } 51 } 52 } 53 } 54} 55 56async fn fetch_single(pool: &SqlitePool, feed_id: i64) -> Result<()> { 57 let feed = db::get_single_feed_for_fetch(pool, feed_id) 58 .await? 59 .ok_or_else(|| anyhow::anyhow!("feed {feed_id} not found"))?; 60 let client = reqwest::Client::builder() 61 .timeout(Duration::from_secs(30)) 62 .build()?; 63 fetch_feed(pool, &client, &feed).await 64} 65 66async fn fetch_all(pool: &SqlitePool) -> Result<()> { 67 let feeds = db::get_all_feeds(pool).await?; 68 let client = reqwest::Client::builder() 69 .timeout(Duration::from_secs(30)) 70 .build()?; 71 72 let max_concurrent: usize = std::env::var("SLURP_FETCH_CONCURRENCY") 73 .ok() 74 .and_then(|v| v.parse().ok()) 75 .unwrap_or(5); 76 77 let semaphore = (max_concurrent != 0).then(|| Arc::new(Semaphore::new(max_concurrent))); 78 79 let mut join_set = JoinSet::new(); 80 for feed in feeds { 81 let pool = pool.clone(); 82 let client = client.clone(); 83 let semaphore = semaphore.clone(); 84 let permit = if let Some(sem) = semaphore { 85 Some(sem.acquire_owned().await.expect("semaphore closed")) 86 } else { 87 None 88 }; 89 90 join_set.spawn(async move { 91 let _permit = permit; 92 if let Err(e) = fetch_feed(&pool, &client, &feed).await { 93 warn!(feed_id = feed.id, url = %feed.url, "failed to fetch: {e:#}"); 94 } 95 }); 96 } 97 98 while let Some(result) = join_set.join_next().await { 99 if let Err(e) = result { 100 warn!("fetch task failed to join: {e}"); 101 } 102 } 103 104 Ok(()) 105} 106 107async fn fetch_feed( 108 pool: &SqlitePool, 109 client: &reqwest::Client, 110 feed: &db::FetcherFeed, 111) -> Result<()> { 112 info!(feed_id = feed.id, url = %feed.url, "fetching"); 113 114 let response = client.get(&feed.url).send().await?.bytes().await?; 115 let parsed = feed_rs::parser::parse(&response[..])?; 116 117 let title = parsed.title.map(|t| t.content).unwrap_or_default(); 118 let site_url = parsed 119 .links 120 .iter() 121 .find(|l| l.rel.as_deref() == Some("alternate")) 122 .or_else(|| parsed.links.first()) 123 .map(|l| l.href.clone()) 124 .unwrap_or_default(); 125 126 db::upsert_feed_metadata(pool, feed.id, &title, &site_url).await?; 127 128 // fetch favicon if missing 129 if feed.favicon_id.is_none() { 130 let icon_url = parsed 131 .icon 132 .map(|i| i.uri) 133 .or_else(|| parsed.logo.map(|l| l.uri)) 134 .or_else(|| { 135 if site_url.is_empty() { 136 None 137 } else { 138 let base = site_url.trim_end_matches('/'); 139 Some(format!("{base}/favicon.ico")) 140 } 141 }); 142 143 if let Some(url) = icon_url 144 && let Some(resolved) = resolve_url(&feed.url, &url) 145 { 146 match fetch_favicon(client, &resolved).await { 147 Ok(data) => { 148 let favicon_id = db::insert_favicon(pool, &data).await?; 149 db::set_feed_favicon(pool, feed.id, favicon_id).await?; 150 } 151 Err(e) => warn!(feed_id = feed.id, "favicon fetch failed: {e:#}"), 152 } 153 } 154 } 155 156 for entry in parsed.entries { 157 let guid = entry.id; 158 let entry_title = entry.title.map(|t| t.content).unwrap_or_default(); 159 let author = entry 160 .authors 161 .first() 162 .map(|a| a.name.clone()) 163 .unwrap_or_default(); 164 let url = entry 165 .links 166 .first() 167 .map(|l| l.href.clone()) 168 .unwrap_or_default(); 169 let content = entry 170 .content 171 .and_then(|c| c.body) 172 .or_else(|| entry.summary.map(|s| s.content)) 173 .unwrap_or_default(); 174 let published_at = entry.published.or(entry.updated).map(|dt| dt.timestamp()); 175 176 db::insert_item( 177 pool, 178 feed.id, 179 &guid, 180 &entry_title, 181 &author, 182 &url, 183 &content, 184 published_at, 185 ) 186 .await?; 187 } 188 189 info!(feed_id = feed.id, "done"); 190 Ok(()) 191} 192 193async fn fetch_favicon(client: &reqwest::Client, url: &str) -> Result<String> { 194 let response = client.get(url).send().await?; 195 let content_type = response 196 .headers() 197 .get("content-type") 198 .and_then(|v| v.to_str().ok()) 199 .unwrap_or("image/x-icon") 200 .to_string(); 201 let mime = content_type 202 .split(';') 203 .next() 204 .unwrap_or("image/x-icon") 205 .trim(); 206 let bytes = response.bytes().await?; 207 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes); 208 Ok(format!("{mime};base64,{encoded}")) 209}