Homebrew RSS reader server
1use anyhow::Result;
2use base64::Engine;
3use sqlx::SqlitePool;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::sync::{Semaphore, broadcast};
7use tokio::task::JoinSet;
8use tracing::{info, warn};
9use url::Url;
10
11use crate::db;
12
13fn resolve_url(base: &str, url: &str) -> Option<String> {
14 Url::parse(url)
15 .or_else(|_| Url::parse(base).and_then(|b| b.join(url)))
16 .ok()
17 .map(|u| u.to_string())
18}
19
20/// Run the periodic fetch loop, also listening for manual refresh triggers.
21pub async fn run(
22 pool: SqlitePool,
23 interval_minutes: u64,
24 mut trigger_rx: broadcast::Receiver<Option<i64>>,
25) {
26 let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60));
27
28 loop {
29 tokio::select! {
30 _ = interval.tick() => {
31 info!("starting periodic feed fetch cycle");
32 if let Err(e) = fetch_all(&pool).await {
33 warn!("periodic feed fetch cycle failed: {e:#}");
34 }
35 }
36 Ok(trigger) = trigger_rx.recv() => {
37 match trigger {
38 Some(feed_id) => {
39 info!(feed_id, "manual refresh triggered for single feed");
40 if let Err(e) = fetch_single(&pool, feed_id).await {
41 warn!(feed_id, "manual feed refresh failed: {e:#}");
42 }
43 }
44 None => {
45 info!("manual refresh triggered for all feeds");
46 if let Err(e) = fetch_all(&pool).await {
47 warn!("manual feed fetch cycle failed: {e:#}");
48 }
49 }
50 }
51 }
52 }
53 }
54}
55
56async fn fetch_single(pool: &SqlitePool, feed_id: i64) -> Result<()> {
57 let feed = db::get_single_feed_for_fetch(pool, feed_id)
58 .await?
59 .ok_or_else(|| anyhow::anyhow!("feed {feed_id} not found"))?;
60 let client = reqwest::Client::builder()
61 .timeout(Duration::from_secs(30))
62 .build()?;
63 fetch_feed(pool, &client, &feed).await
64}
65
66async fn fetch_all(pool: &SqlitePool) -> Result<()> {
67 let feeds = db::get_all_feeds(pool).await?;
68 let client = reqwest::Client::builder()
69 .timeout(Duration::from_secs(30))
70 .build()?;
71
72 let max_concurrent: usize = std::env::var("SLURP_FETCH_CONCURRENCY")
73 .ok()
74 .and_then(|v| v.parse().ok())
75 .unwrap_or(5);
76
77 let semaphore = (max_concurrent != 0).then(|| Arc::new(Semaphore::new(max_concurrent)));
78
79 let mut join_set = JoinSet::new();
80 for feed in feeds {
81 let pool = pool.clone();
82 let client = client.clone();
83 let semaphore = semaphore.clone();
84 let permit = if let Some(sem) = semaphore {
85 Some(sem.acquire_owned().await.expect("semaphore closed"))
86 } else {
87 None
88 };
89
90 join_set.spawn(async move {
91 let _permit = permit;
92 if let Err(e) = fetch_feed(&pool, &client, &feed).await {
93 warn!(feed_id = feed.id, url = %feed.url, "failed to fetch: {e:#}");
94 }
95 });
96 }
97
98 while let Some(result) = join_set.join_next().await {
99 if let Err(e) = result {
100 warn!("fetch task failed to join: {e}");
101 }
102 }
103
104 Ok(())
105}
106
107async fn fetch_feed(
108 pool: &SqlitePool,
109 client: &reqwest::Client,
110 feed: &db::FetcherFeed,
111) -> Result<()> {
112 info!(feed_id = feed.id, url = %feed.url, "fetching");
113
114 let response = client.get(&feed.url).send().await?.bytes().await?;
115 let parsed = feed_rs::parser::parse(&response[..])?;
116
117 let title = parsed.title.map(|t| t.content).unwrap_or_default();
118 let site_url = parsed
119 .links
120 .iter()
121 .find(|l| l.rel.as_deref() == Some("alternate"))
122 .or_else(|| parsed.links.first())
123 .map(|l| l.href.clone())
124 .unwrap_or_default();
125
126 db::upsert_feed_metadata(pool, feed.id, &title, &site_url).await?;
127
128 // fetch favicon if missing
129 if feed.favicon_id.is_none() {
130 let icon_url = parsed
131 .icon
132 .map(|i| i.uri)
133 .or_else(|| parsed.logo.map(|l| l.uri))
134 .or_else(|| {
135 if site_url.is_empty() {
136 None
137 } else {
138 let base = site_url.trim_end_matches('/');
139 Some(format!("{base}/favicon.ico"))
140 }
141 });
142
143 if let Some(url) = icon_url
144 && let Some(resolved) = resolve_url(&feed.url, &url)
145 {
146 match fetch_favicon(client, &resolved).await {
147 Ok(data) => {
148 let favicon_id = db::insert_favicon(pool, &data).await?;
149 db::set_feed_favicon(pool, feed.id, favicon_id).await?;
150 }
151 Err(e) => warn!(feed_id = feed.id, "favicon fetch failed: {e:#}"),
152 }
153 }
154 }
155
156 for entry in parsed.entries {
157 let guid = entry.id;
158 let entry_title = entry.title.map(|t| t.content).unwrap_or_default();
159 let author = entry
160 .authors
161 .first()
162 .map(|a| a.name.clone())
163 .unwrap_or_default();
164 let url = entry
165 .links
166 .first()
167 .map(|l| l.href.clone())
168 .unwrap_or_default();
169 let content = entry
170 .content
171 .and_then(|c| c.body)
172 .or_else(|| entry.summary.map(|s| s.content))
173 .unwrap_or_default();
174 let published_at = entry.published.or(entry.updated).map(|dt| dt.timestamp());
175
176 db::insert_item(
177 pool,
178 feed.id,
179 &guid,
180 &entry_title,
181 &author,
182 &url,
183 &content,
184 published_at,
185 )
186 .await?;
187 }
188
189 info!(feed_id = feed.id, "done");
190 Ok(())
191}
192
193async fn fetch_favicon(client: &reqwest::Client, url: &str) -> Result<String> {
194 let response = client.get(url).send().await?;
195 let content_type = response
196 .headers()
197 .get("content-type")
198 .and_then(|v| v.to_str().ok())
199 .unwrap_or("image/x-icon")
200 .to_string();
201 let mime = content_type
202 .split(';')
203 .next()
204 .unwrap_or("image/x-icon")
205 .trim();
206 let bytes = response.bytes().await?;
207 let encoded = base64::engine::general_purpose::STANDARD.encode(&bytes);
208 Ok(format!("{mime};base64,{encoded}"))
209}