Homebrew RSS reader server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fetcher: add concurrent feed fetching with bounded parallelism

* uses tokio JoinSet and Semaphore for bounded concurrency
* SLURP_FETCH_CONCURRENCY env var controls limit (default: 5)
* setting concurrency to 0 allows unlimited parallel fetches

Sequential fetching was slow when subscribed to many feeds.

+33 -11
+33 -11
src/fetcher.rs
··· 1 1 use anyhow::Result; 2 2 use base64::Engine; 3 3 use sqlx::SqlitePool; 4 + use std::sync::Arc; 4 5 use std::time::Duration; 6 + use tokio::sync::Semaphore; 7 + use tokio::task::JoinSet; 5 8 use tracing::{info, warn}; 6 9 use url::Url; 7 10 ··· 32 35 .timeout(Duration::from_secs(30)) 33 36 .build()?; 34 37 38 + let max_concurrent: usize = std::env::var("SLURP_FETCH_CONCURRENCY") 39 + .ok() 40 + .and_then(|v| v.parse().ok()) 41 + .unwrap_or(5); 42 + 43 + let semaphore = (max_concurrent != 0).then(|| Arc::new(Semaphore::new(max_concurrent))); 44 + 45 + let mut join_set = JoinSet::new(); 35 46 for feed in feeds { 36 - if let Err(e) = fetch_feed(pool, &client, &feed).await { 37 - warn!(feed_id = feed.id, url = %feed.url, "failed to fetch: {e:#}"); 47 + let pool = pool.clone(); 48 + let client = client.clone(); 49 + let semaphore = semaphore.clone(); 50 + let permit = if let Some(sem) = semaphore { 51 + Some(sem.acquire_owned().await.expect("semaphore closed")) 52 + } else { 53 + None 54 + }; 55 + 56 + join_set.spawn(async move { 57 + let _permit = permit; 58 + if let Err(e) = fetch_feed(&pool, &client, &feed).await { 59 + warn!(feed_id = feed.id, url = %feed.url, "failed to fetch: {e:#}"); 60 + } 61 + }); 62 + } 63 + 64 + while let Some(result) = join_set.join_next().await { 65 + if let Err(e) = result { 66 + warn!("fetch task failed to join: {e}"); 38 67 } 39 68 } 40 69 41 70 Ok(()) 42 71 } 43 72 44 - async fn fetch_feed( 45 - pool: &SqlitePool, 46 - client: &reqwest::Client, 47 - feed: &db::FeedRow, 48 - ) -> Result<()> { 73 + async fn fetch_feed(pool: &SqlitePool, client: &reqwest::Client, feed: &db::FeedRow) -> Result<()> { 49 74 info!(feed_id = feed.id, url = %feed.url, "fetching"); 50 75 51 76 let response = client.get(&feed.url).send().await?.bytes().await?; ··· 108 133 .and_then(|c| c.body) 109 134 .or_else(|| entry.summary.map(|s| s.content)) 110 135 .unwrap_or_default(); 111 - let published_at = entry 112 - .published 113 - .or(entry.updated) 114 - .map(|dt| dt.timestamp()); 136 + let published_at = entry.published.or(entry.updated).map(|dt| dt.timestamp()); 115 137 116 138 db::insert_item( 117 139 pool,