A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Optimize feed fetching and improve HTTP transport

+15 -25
+1 -1
internal/atproto/sync.go
··· 267 267 } 268 268 269 269 g, gCtx := errgroup.WithContext(ctx) 270 - g.SetLimit(10) 270 + g.SetLimit(20) 271 271 272 272 dids := make([]string, 0, len(activeFollows)) 273 273 profiles := make([]db.UserData, len(activeFollows))
-8
internal/db/multi.go
··· 23 23 articlesPath := basePath + "_articles" 24 24 recsPath := basePath + "_recs" 25 25 26 - for _, p := range []string{articlesPath, recsPath} { 27 - f, err := sql.Open("sqlite3", p+"?"+DSN) 28 - if err != nil { 29 - return nil, err 30 - } 31 - f.Close() 32 - } 33 - 34 26 seq := atomic.AddInt64(&multiDriverSeq, 1) 35 27 driverName := fmt.Sprintf("sqlite3_glean_multi_%d", seq) 36 28
+11 -13
internal/feed/fetcher.go
··· 8 8 "sync" 9 9 "time" 10 10 11 + "golang.org/x/sync/errgroup" 11 12 "pkg.rbrt.fr/glean/internal/httpclient" 12 13 "pkg.rbrt.fr/glean/internal/metrics" 13 14 ) ··· 171 172 } 172 173 173 174 func (s *Scheduler) fetchAll(ctx context.Context) { 174 - feeds, err := s.store.GetFeedsToFetch(ctx, s.staleInterval, 10_000) 175 + feeds, err := s.store.GetFeedsToFetch(ctx, s.staleInterval, 1_000) 175 176 if err != nil { 176 177 s.logger.Error("failed to get feeds", "error", err) 177 178 return 178 179 } 179 180 180 - sem := make(chan struct{}, 10) 181 - var wg sync.WaitGroup 181 + s.logger.Info("feeds fetched", "count", len(feeds)) 182 + 183 + g, gCtx := errgroup.WithContext(ctx) 184 + g.SetLimit(20) 182 185 for _, f := range feeds { 183 - wg.Add(1) 184 - sem <- struct{}{} 185 - go func(feed *Feed) { 186 - defer func() { 187 - <-sem 188 - wg.Done() 189 - }() 190 - s.FetchFeed(ctx, feed) 191 - }(f) 186 + g.Go(func() error { 187 + s.FetchFeed(gCtx, f) 188 + return nil 189 + }) 192 190 } 193 - wg.Wait() 191 + _ = g.Wait() 194 192 } 195 193 196 194 func (s *Scheduler) FetchFeed(ctx context.Context, feed *Feed) {
+3 -3
internal/httpclient/httpclient.go
··· 19 19 func NewTransport() *http.Transport { 20 20 return &http.Transport{ 21 21 DialContext: (&net.Dialer{ 22 - Timeout: 10 * time.Second, 23 - KeepAlive: 15 * time.Second, 22 + Timeout: 10 * time.Second, 23 + KeepAlive: 15 * time.Second, 24 + FallbackDelay: 300 * time.Millisecond, 24 25 }).DialContext, 25 26 MaxIdleConns: 50, 26 27 IdleConnTimeout: 10 * time.Second, ··· 31 32 32 33 func SetDefaultHeaders(req *http.Request) { 33 34 req.Header.Set("User-Agent", UserAgent) 34 - req.Header.Set("Accept-Encoding", "br,gzip") 35 35 req.Header.Set("Connection", "close") 36 36 } 37 37