A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Make feed fetch scheduler configurable and improve update logic

+103 -156
+3 -2
.env.example
··· 2 2 GLEAN_DB=glean.db 3 3 GLEAN_JETSTREAM=wss://jetstream.glean.at 4 4 GLEAN_PLC_URL=https://didplc.glean.at 5 - GLEAN_SYNC_INTERVAL=1h 6 - GLEAN_CLUSTER_INTERVAL=10m 5 + GLEAN_SYNC_INTERVAL=30m 6 + GLEAN_CLUSTER_INTERVAL=1h 7 + GLEAN_FETCH_INTERVAL=5m 7 8 GLEAN_COLLECTION_DIR_URL=https://lightrail.microcosm.blue/xrpc/com.atproto.sync.listReposByCollection?collection=at.glean.subscription 8 9 # Leave empty for localhost OAuth (development) 9 10 # GLEAN_OAUTH_CLIENT_ID=https://glean.at/oauth/client-metadata
+2 -2
docs/specs.md
··· 328 328 329 329 ### 4.2 Fetch Schedule 330 330 331 - The scheduler uses a single fixed interval with in-flight deduplication: 331 + The scheduler uses a configurable tick interval with in-flight deduplication: 332 332 333 - - **Tick interval**: The scheduler checks for stale feeds every 5 minutes 333 + - **Tick interval**: The scheduler checks for stale feeds every `GLEAN_FETCH_INTERVAL` (default 5 minutes) 334 334 - **Staleness threshold**: Feeds not fetched in the last 30 minutes are eligible 335 335 - **Subscriber filter**: Only feeds with `subscriber_count > 0` are fetched 336 336 - **In-flight dedup**: If a feed is already being fetched (e.g., manual refresh and background scheduler overlap), the second caller waits for the first to complete rather than fetching again
+1 -1
internal/atproto/jetstream.go
··· 130 130 return ctx.Err() 131 131 } 132 132 if err != nil { 133 - jc.logger.Error("jetstream connection error", "error", err) 133 + jc.logger.Warn("jetstream connection lost", "error", err) 134 134 metrics.JetstreamReconnects.Inc() 135 135 } 136 136
+8 -6
internal/cluster/social.go
··· 17 17 18 18 _, err = tx.ExecContext(ctx, ` 19 19 INSERT INTO follow_distances (user_a, user_b, distance) 20 - SELECT user_did, target_did, 1 FROM follows WHERE user_did != target_did 21 - UNION ALL 22 - SELECT f1.user_did, f2.target_did, 2 23 - FROM follows f1 24 - JOIN follows f2 ON f1.target_did = f2.user_did 25 - WHERE f1.user_did != f2.target_did 20 + SELECT user_a, user_b, MIN(distance) FROM ( 21 + SELECT user_did AS user_a, target_did AS user_b, 1 AS distance FROM follows WHERE user_did != target_did 22 + UNION ALL 23 + SELECT f1.user_did, f2.target_did, 2 24 + FROM follows f1 25 + JOIN follows f2 ON f1.target_did = f2.user_did 26 + WHERE f1.user_did != f2.target_did 27 + ) GROUP BY user_a, user_b 26 28 `) 27 29 if err != nil { 28 30 return err
-25
internal/db/article.go
··· 37 37 ReadAt sql.NullTime 38 38 } 39 39 40 - func (db *DB) UpsertArticle(ctx context.Context, article *Article) (int64, error) { 41 - var id int64 42 - err := db.QueryRowContext(ctx, ` 43 - INSERT INTO articles (feed_url, guid, title, url, author, summary, content, published, updated) 44 - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 45 - ON CONFLICT(feed_url, guid) DO NOTHING 46 - RETURNING id 47 - `, article.FeedURL, article.GUID, article.Title, article.URL, article.Author, 48 - article.Summary, article.Content, article.Published, article.Updated).Scan(&id) 49 - if err == sql.ErrNoRows { 50 - err = db.QueryRowContext(ctx, ` 51 - SELECT id FROM articles WHERE feed_url = ? AND guid = ? 52 - `, article.FeedURL, article.GUID).Scan(&id) 53 - } 54 - return id, err 55 - } 56 - 57 40 func (db *DB) UpsertArticlesBatch(ctx context.Context, articles []feed.Article) error { 58 41 if len(articles) == 0 { 59 42 return nil 60 43 } 61 44 62 - err := upsertArticlesBatch(ctx, db, articles) 63 - if err != nil { 64 - err = upsertArticlesBatch(ctx, db, articles) 65 - } 66 - return err 67 - } 68 - 69 - func upsertArticlesBatch(ctx context.Context, db *DB, articles []feed.Article) error { 70 45 tx, err := db.BeginTx(ctx, nil) 71 46 if err != nil { 72 47 return err
+16 -31
internal/db/store.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "database/sql" 6 5 "time" 7 6 8 7 "pkg.rbrt.fr/glean/internal/feed" ··· 37 36 return feeds, nil 38 37 } 39 38 40 - func (a *FeedStoreAdapter) UpsertArticle(ctx context.Context, article *feed.Article) (int64, error) { 41 - dbArticle := &Article{ 42 - FeedURL: article.FeedURL, 43 - GUID: article.GUID, 44 - Title: article.Title, 45 - Summary: sql.NullString{String: article.Summary, Valid: article.Summary != ""}, 46 - Content: sql.NullString{String: article.Content, Valid: article.Content != ""}, 47 - Author: sql.NullString{String: article.Author, Valid: article.Author != ""}, 48 - URL: sql.NullString{String: article.URL, Valid: article.URL != ""}, 49 - } 50 - if !article.Published.IsZero() { 51 - dbArticle.Published = sql.NullTime{Time: article.Published, Valid: true} 52 - } 53 - if !article.Updated.IsZero() { 54 - dbArticle.Updated = sql.NullTime{Time: article.Updated, Valid: true} 55 - } 56 - return a.db.UpsertArticle(ctx, dbArticle) 57 - } 58 - 59 - func (a *FeedStoreAdapter) UpsertArticlesBatch(ctx context.Context, articles []feed.Article) error { 60 - return a.db.UpsertArticlesBatch(ctx, articles) 61 - } 62 - 63 - func (a *FeedStoreAdapter) MarkFeedFetched(ctx context.Context, feedURL, etag, lastModified string) error { 64 - return a.db.MarkFeedFetched(ctx, feedURL, etag, lastModified) 65 - } 66 - 67 - func (a *FeedStoreAdapter) MarkFeedFetchError(ctx context.Context, feedURL, lastError string) error { 39 + func (a *FeedStoreAdapter) RecordFetchError(ctx context.Context, feedURL, lastError string) error { 68 40 return a.db.MarkFeedFetchError(ctx, feedURL, lastError) 69 41 } 70 42 71 - func (a *FeedStoreAdapter) UpdateFeedFavicon(ctx context.Context, feedURL, faviconURL string) error { 72 - return a.db.UpdateFeedFavicon(ctx, feedURL, faviconURL) 43 + func (a *FeedStoreAdapter) StoreFetchResult(ctx context.Context, feedURL, etag, lastModified string, articles []feed.Article, faviconURL string) error { 44 + if err := a.db.MarkFeedFetched(ctx, feedURL, etag, lastModified); err != nil { 45 + return err 46 + } 47 + if len(articles) > 0 { 48 + if err := a.db.UpsertArticlesBatch(ctx, articles); err != nil { 49 + return err 50 + } 51 + } 52 + if faviconURL != "" { 53 + if err := a.db.UpdateFeedFavicon(ctx, feedURL, faviconURL); err != nil { 54 + return err 55 + } 56 + } 57 + return nil 73 58 }
+3 -6
internal/feed/discover.go
··· 63 63 return strings.TrimRight(s, "/") 64 64 } 65 65 66 - func ResolveFavicon(ctx context.Context, feedURL, siteURL, parsedFavicon string) string { 67 - if parsedFavicon != "" { 68 - return cleanFavicon(parsedFavicon) 69 - } 66 + func ResolveFavicon(ctx context.Context, feedURL, siteURL string) string { 70 67 target := siteURL 71 68 if target == "" { 72 69 target = feedURL ··· 117 114 origin.Fragment = "" 118 115 119 116 type result struct { 120 - url string 117 + url string 121 118 found bool 122 119 } 123 120 found := make(chan result, 1) ··· 201 198 202 199 base := resp.Request.URL 203 200 return base, string(body) 204 - } 201 + }
+34 -48
internal/feed/fetcher.go
··· 69 69 70 70 type FeedStore interface { 71 71 GetFeedsToFetch(ctx context.Context, olderThan time.Duration, limit int) ([]*Feed, error) 72 - UpsertArticle(ctx context.Context, article *Article) (int64, error) 73 - UpsertArticlesBatch(ctx context.Context, articles []Article) error 74 - MarkFeedFetched(ctx context.Context, feedURL, etag, lastModified string) error 75 - MarkFeedFetchError(ctx context.Context, feedURL, lastError string) error 76 - UpdateFeedFavicon(ctx context.Context, feedURL, faviconURL string) error 72 + StoreFetchResult(ctx context.Context, feedURL, etag, lastModified string, articles []Article, faviconURL string) error 73 + RecordFetchError(ctx context.Context, feedURL, lastError string) error 77 74 } 78 75 79 76 type fetchCall struct { ··· 81 78 } 82 79 83 80 type Scheduler struct { 84 - fetcher *Fetcher 85 - store FeedStore 86 - logger *slog.Logger 87 - interval time.Duration 88 - inFlight sync.Map 81 + fetcher *Fetcher 82 + store FeedStore 83 + logger *slog.Logger 84 + tickInterval time.Duration 85 + staleInterval time.Duration 86 + inFlight sync.Map 89 87 } 90 88 91 - func NewScheduler(store FeedStore, logger *slog.Logger) *Scheduler { 89 + func NewScheduler(store FeedStore, logger *slog.Logger, tickInterval, staleInterval time.Duration) *Scheduler { 92 90 return &Scheduler{ 93 - fetcher: NewFetcher(), 94 - store: store, 95 - logger: logger, 96 - interval: 30 * time.Minute, 97 - inFlight: sync.Map{}, 91 + fetcher: NewFetcher(), 92 + store: store, 93 + logger: logger, 94 + tickInterval: tickInterval, 95 + staleInterval: staleInterval, 96 + inFlight: sync.Map{}, 98 97 } 99 98 } 100 99 101 100 func (s *Scheduler) Run(ctx context.Context) error { 102 - ticker := time.NewTicker(5 * time.Minute) 101 + ticker := time.NewTicker(s.tickInterval) 103 102 defer ticker.Stop() 104 - 105 - s.fetchAll(ctx) 106 103 107 104 for { 108 105 select { ··· 115 112 } 116 113 117 114 func (s *Scheduler) fetchAll(ctx context.Context) { 118 - feeds, err := s.store.GetFeedsToFetch(ctx, s.interval, 200) 115 + feeds, err := s.store.GetFeedsToFetch(ctx, s.staleInterval, 500) 119 116 if err != nil { 120 117 s.logger.Error("failed to get feeds", "error", err) 121 118 return ··· 151 148 start := time.Now() 152 149 result, newEtag, newLastModified, err := s.fetcher.Fetch(ctx, feed.URL, feed.ETag, feed.LastModified) 153 150 metrics.FeedsFetchedDuration.Observe(time.Since(start).Seconds()) 151 + metrics.FeedsFetched.Inc() 152 + metrics.FeedsFetchedLast.Set(float64(time.Now().Unix())) 154 153 if err != nil { 155 - metrics.FeedsFetched.WithLabelValues("error").Inc() 156 154 s.logger.Error("failed to fetch feed", "error", err, "feed", feed.URL) 157 - if updErr := s.store.MarkFeedFetchError(ctx, feed.URL, err.Error()); updErr != nil { 158 - s.logger.Error("failed to update feed fetch error", "error", updErr, "feed", feed.URL) 159 - } 155 + s.store.RecordFetchError(ctx, feed.URL, err.Error()) 160 156 return 161 157 } 162 158 163 159 if result == nil { 164 - metrics.FeedsFetched.WithLabelValues("not_modified").Inc() 165 - if updErr := s.store.MarkFeedFetched(ctx, feed.URL, feed.ETag, feed.LastModified); updErr != nil { 166 - s.logger.Error("failed to update feed fetch result", "error", updErr, "feed", feed.URL) 160 + s.logger.Info("fetched articles", "feed", feed.URL, "count", 0) 161 + if err := s.store.StoreFetchResult(ctx, feed.URL, newEtag, newLastModified, nil, ""); err != nil { 162 + s.logger.Error("failed to store feed fetch result", "error", err, "feed", feed.URL) 167 163 } 168 164 return 169 165 } 170 166 171 - metrics.FeedsFetched.WithLabelValues("success").Inc() 172 - 173 - for _, article := range result.Articles { 174 - article.FeedURL = feed.URL 167 + faviconURL := result.Feed.FaviconURL 168 + if faviconURL == "" && feed.FaviconURL == "" { 169 + faviconURL = ResolveFavicon(context.Background(), feed.URL, feed.SiteURL) 175 170 } 176 - if err := s.store.UpsertArticlesBatch(ctx, result.Articles); err != nil { 177 - s.logger.Error("failed to upsert articles", "error", err, "feed", feed.URL) 171 + 172 + if err := s.store.StoreFetchResult(ctx, feed.URL, newEtag, newLastModified, result.Articles, faviconURL); err != nil { 173 + s.logger.Error("failed to store feed fetch result", "error", err, "feed", feed.URL) 178 174 } else { 179 - metrics.ArticlesUpserted.Add(float64(len(result.Articles))) 180 - } 181 - 182 - if err := s.store.MarkFeedFetched(ctx, feed.URL, newEtag, newLastModified); err != nil { 183 - s.logger.Error("failed to update feed fetch result", "error", err, "feed", feed.URL) 184 - } 185 - 186 - if result != nil && result.Feed.FaviconURL != "" { 187 - _ = s.store.UpdateFeedFavicon(ctx, feed.URL, result.Feed.FaviconURL) 188 - } else if feed.FaviconURL == "" { 189 - go func() { 190 - if f := ResolveFavicon(context.Background(), feed.URL, feed.SiteURL, ""); f != "" { 191 - _ = s.store.UpdateFeedFavicon(context.Background(), feed.URL, f) 192 - } 193 - }() 175 + articleCount := len(result.Articles) 176 + s.logger.Info("fetched articles", "feed", feed.URL, "count", articleCount) 177 + if articleCount > 0 { 178 + metrics.ArticlesUpserted.Add(float64(articleCount)) 179 + } 194 180 } 195 181 }
+2 -10
internal/feed/opml.go
··· 4 4 "bytes" 5 5 "encoding/xml" 6 6 "io" 7 + "slices" 7 8 "strings" 8 9 ) 9 10 ··· 115 116 } 116 117 if f.Category != "" { 117 118 categoryMap[f.Category] = append(categoryMap[f.Category], outline) 118 - if !contains(categories, f.Category) { 119 + if !slices.Contains(categories, f.Category) { 119 120 categories = append(categories, f.Category) 120 121 } 121 122 } else { ··· 143 144 144 145 return buf.Bytes(), nil 145 146 } 146 - 147 - func contains(slice []string, s string) bool { 148 - for _, v := range slice { 149 - if v == s { 150 - return true 151 - } 152 - } 153 - return false 154 - }
+7 -2
internal/metrics/metrics.go
··· 6 6 ) 7 7 8 8 var ( 9 - FeedsFetched = promauto.NewCounterVec(prometheus.CounterOpts{ 9 + FeedsFetched = promauto.NewCounter(prometheus.CounterOpts{ 10 10 Name: "glean_feeds_fetched_total", 11 11 Help: "Total number of feed fetch attempts", 12 - }, []string{"status"}) 12 + }) 13 + 14 + FeedsFetchedLast = promauto.NewGauge(prometheus.GaugeOpts{ 15 + Name: "glean_feeds_fetched_last_timestamp_seconds", 16 + Help: "Unix timestamp of last feed fetch", 17 + }) 13 18 14 19 FeedsFetchedDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 15 20 Name: "glean_feed_fetch_duration_seconds",
+24 -22
internal/server/feeds_handler.go
··· 72 72 return 73 73 } 74 74 75 + var feedTitle string 76 + var faviconURL string 75 77 if result != nil { 76 - f := &db.Feed{ 77 - FeedURL: feedURL, 78 - Title: nullString(result.Feed.Title), 79 - SiteURL: nullString(result.Feed.SiteURL), 80 - Description: nullString(result.Feed.Description), 81 - FeedType: nullString(result.Feed.Type), 82 - } 83 - if err := s.db.UpsertFeed(r.Context(), f); err != nil { 84 - s.logger.Error("failed to upsert feed", "error", err) 85 - http.Error(w, err.Error(), http.StatusInternalServerError) 86 - return 78 + feedTitle = result.Feed.Title 79 + faviconURL = result.Feed.FaviconURL 80 + if faviconURL == "" { 81 + go func() { 82 + if f := feed.ResolveFavicon(context.Background(), feedURL, result.Feed.SiteURL); f != "" { 83 + _ = s.db.UpdateFeedFavicon(context.Background(), feedURL, f) 84 + } 85 + }() 87 86 } 87 + } 88 88 89 - go func() { 90 - if f := feed.ResolveFavicon(context.Background(), feedURL, result.Feed.SiteURL, result.Feed.FaviconURL); f != "" { 91 - _ = s.db.UpdateFeedFavicon(context.Background(), feedURL, f) 92 - } 93 - }() 89 + f := &db.Feed{ 90 + FeedURL: feedURL, 91 + Title: nullString(feedTitle), 92 + SiteURL: nullString(result.Feed.SiteURL), 93 + Description: nullString(result.Feed.Description), 94 + FeedType: nullString(result.Feed.Type), 95 + FaviconURL: nullString(faviconURL), 94 96 } 95 - 96 - var feedTitle string 97 - if result != nil { 98 - feedTitle = result.Feed.Title 97 + if err := s.db.UpsertFeed(r.Context(), f); err != nil { 98 + s.logger.Error("failed to upsert feed", "error", err) 99 + http.Error(w, err.Error(), http.StatusInternalServerError) 100 + return 99 101 } 100 102 101 103 var subURI, subCID string ··· 235 237 } 236 238 237 239 go func(feedURL, siteURL string) { 238 - if f := feed.ResolveFavicon(context.Background(), feedURL, siteURL, ""); f != "" { 239 - _ = s.db.UpdateFeedFavicon(context.Background(), feedURL, f) 240 + if fav := feed.ResolveFavicon(context.Background(), feedURL, siteURL); fav != "" { 241 + _ = s.db.UpdateFeedFavicon(context.Background(), feedURL, fav) 240 242 } 241 243 }(fu.URL, fu.SiteURL) 242 244
+2 -1
main.go
··· 25 25 jetstreamURL := flag.String("jetstream", envOr("GLEAN_JETSTREAM", "wss://jetstream.glean.at"), "Jetstream URL") 26 26 syncInterval := flag.Duration("sync-interval", envDuration("GLEAN_SYNC_INTERVAL", 1*time.Hour), "PDS sync interval") 27 27 clusterInterval := flag.Duration("cluster-interval", envDuration("GLEAN_CLUSTER_INTERVAL", 10*time.Minute), "cluster recomputation interval") 28 + fetchInterval := flag.Duration("fetch-interval", envDuration("GLEAN_FETCH_INTERVAL", 5*time.Minute), "feed fetch tick interval") 28 29 collectionDirURL := flag.String("collection-dir", envOr("GLEAN_COLLECTION_DIR_URL", ""), "collection directory URL for startup backfill") 29 30 backfillConcurrency := flag.Int("backfill-concurrency", envInt("GLEAN_BACKFILL_CONCURRENCY", 5), "max concurrent backfill workers") 30 31 flag.Parse() ··· 44 45 callbackURL := envOr("GLEAN_OAUTH_REDIRECT_URL", "") 45 46 46 47 storeAdapter := db.NewFeedStoreAdapter(database) 47 - scheduler := feed.NewScheduler(storeAdapter, logger) 48 + scheduler := feed.NewScheduler(storeAdapter, logger, *fetchInterval, 30*time.Minute) 48 49 49 50 engine := cluster.NewEngine(database.DB, logger) 50 51
+1
readme.md
··· 44 44 | `GLEAN_JETSTREAM` | `wss://jetstream.glean.at` | Jetstream WebSocket URL | 45 45 | `GLEAN_SYNC_INTERVAL` | `1h` | PDS sync interval (Go duration: `30m`, `2h30m`, etc.) | 46 46 | `GLEAN_CLUSTER_INTERVAL` | `10m` | Cluster recomputation interval (Go duration) | 47 + | `GLEAN_FETCH_INTERVAL` | `5m` | Feed fetch scheduler tick interval (Go duration) | 47 48 | `GLEAN_COLLECTION_DIR_URL` | _(empty)_ | Collection directory URL for startup backfill | 48 49 | `GLEAN_PLC_URL` | `https://didplc.glean.at` | PLC directory URL for DID resolution | 49 50 | `GLEAN_OAUTH_CLIENT_ID` | _(empty)_ | OAuth client metadata URL (leave empty for localhost dev) |