A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Tweaks in fetcher for improved fetching

+18 -18
+9 -5
internal/db/store.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 "time" 6 7 7 8 "pkg.rbrt.fr/glean/internal/feed" ··· 32 33 } 33 34 34 35 func (a *FeedStoreAdapter) StoreFetchResult(ctx context.Context, feedURL string, articles []feed.Article, faviconURL string) error { 35 - if err := a.store.MarkFeedFetched(ctx, feedURL); err != nil { 36 - return err 37 - } 38 36 if len(articles) > 0 { 39 37 if err := a.store.UpsertArticlesBatch(ctx, articles); err != nil { 40 - return err 38 + return fmt.Errorf("failed to save articles: %w", err) 41 39 } 42 40 } 41 + 42 + if err := a.store.MarkFeedFetched(ctx, feedURL); err != nil { 43 + return fmt.Errorf("failed to mark as fetched: %w", err) 44 + } 45 + 43 46 if faviconURL != "" { 44 47 if err := a.store.UpdateFeedFavicon(ctx, feedURL, faviconURL); err != nil { 45 - return err 48 + return fmt.Errorf("failed to save favicon: %w", err) 46 49 } 47 50 } 51 + 48 52 return nil 49 53 }
+9 -13
internal/feed/fetcher.go
··· 49 49 } 50 50 } 51 51 52 + if lastResp != nil { 53 + lastResp.Body.Close() 54 + } 55 + 52 56 result, resp, err := f.executeRequest(ctx, feedURL) 53 57 lastResp = resp 54 58 if err == nil { ··· 79 83 return nil, nil, fmt.Errorf("fetching feed: %w", err) 80 84 } 81 85 defer resp.Body.Close() 82 - 83 - if resp.StatusCode == http.StatusTooManyRequests { 84 - return nil, resp, fmt.Errorf("rate limited (retry-after: %s)", resp.Header.Get("Retry-After")) 85 - } 86 - 87 - if resp.StatusCode >= 500 { 88 - return nil, resp, fmt.Errorf("server error: %d", resp.StatusCode) 89 - } 90 - 91 - if resp.StatusCode < 200 || resp.StatusCode >= 300 { 92 - return nil, resp, fmt.Errorf("unexpected status: %d", resp.StatusCode) 93 - } 94 86 95 87 result, err := Parse(resp.Body, feedURL) 96 88 if err != nil { ··· 159 151 } 160 152 161 153 func (s *Scheduler) fetchAll(ctx context.Context, olderThan time.Duration) { 162 - feeds, err := s.store.GetFeedsToFetch(ctx, olderThan, 10_000) 154 + feeds, err := s.store.GetFeedsToFetch(ctx, olderThan, 1000) 163 155 if err != nil { 164 156 s.logger.Error("failed to get feeds", "error", err) 165 157 return 166 158 } 167 159 160 + start := time.Now() 168 161 s.logger.Info("fetching feeds", "count", len(feeds), "older_than", olderThan) 169 162 170 163 g, gCtx := errgroup.WithContext(ctx) ··· 176 169 }) 177 170 } 178 171 _ = g.Wait() 172 + 173 + s.logger.Info("fetching feeds complete", "duration", time.Since(start).Seconds()) 179 174 } 180 175 181 176 func (s *Scheduler) FetchFeed(ctx context.Context, feed *Feed) { 182 177 call := &fetchCall{done: make(chan struct{})} 183 178 if actual, loaded := s.inFlight.LoadOrStore(feed.URL, call); loaded { 179 + s.logger.Debug("feed already in flight, skipping", "feed", feed.URL) 184 180 select { 185 181 case <-actual.(*fetchCall).done: 186 182 case <-ctx.Done():