A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add configurable concurrency to collection directory backfill

+46 -22
+34 -21
internal/server/server.go
··· 11 11 "slices" 12 12 "strconv" 13 13 "strings" 14 + "sync" 14 15 "time" 15 16 16 17 "github.com/go-chi/chi/v5" ··· 441 442 } 442 443 } 443 444 444 - func (s *Server) BackfillFromCollectionDir(ctx context.Context, collectionDirURL string) { 445 + func (s *Server) BackfillFromCollectionDir(ctx context.Context, collectionDirURL string, concurrency int) { 445 446 if collectionDirURL == "" { 446 447 return 447 448 } ··· 469 470 470 471 s.logger.Info("collection directory backfill", "total", len(dids), "missing", len(missing)) 471 472 473 + sem := make(chan struct{}, concurrency) 474 + var wg sync.WaitGroup 475 + 472 476 for _, did := range missing { 473 477 if ctx.Err() != nil { 474 - return 478 + break 475 479 } 476 480 477 - handle := did 478 - if ident, err := atproto.ResolveIdentity(ctx, did); err == nil { 479 - handle = ident.Handle.String() 480 - } 481 + sem <- struct{}{} 482 + wg.Add(1) 481 483 482 - if _, err := s.db.CreateUser(ctx, did, handle, "", ""); err != nil { 483 - s.logger.Error("failed to create user during backfill", "error", err, "did", did) 484 - continue 485 - } 484 + go func(did string) { 485 + defer func() { <-sem }() 486 + defer wg.Done() 486 487 487 - pdsURL, err := atproto.ResolvePDSEndpoint(ctx, did) 488 - if err != nil { 489 - s.logger.Error("failed to resolve PDS for backfill", "error", err, "did", did) 490 - continue 491 - } 488 + handle := did 489 + if ident, err := atproto.ResolveIdentity(ctx, did); err == nil { 490 + handle = ident.Handle.String() 491 + } 492 492 493 - client := atproto.NewUnauthenticatedClient(pdsURL) 494 - sync := atproto.NewSync(s.db, client, s.logger) 495 - if err := sync.Run(ctx, did); err != nil { 496 - s.logger.Error("backfill sync failed", "error", err, "did", did) 497 - } 493 + if _, err := s.db.CreateUser(ctx, did, handle, "", ""); err != nil { 494 + s.logger.Error("failed to create user during backfill", "error", err, "did", did) 495 + return 496 + } 497 + 498 + pdsURL, err := atproto.ResolvePDSEndpoint(ctx, did) 499 + if err != nil { 500 + s.logger.Error("failed to resolve PDS for backfill", "error", err, "did", did) 501 + return 502 + } 503 + 504 + client := atproto.NewUnauthenticatedClient(pdsURL) 505 + sync := atproto.NewSync(s.db, client, s.logger) 506 + if err := sync.Run(ctx, did); err != nil { 507 + s.logger.Error("backfill sync failed", "error", err, "did", did) 508 + } 498 509 499 - s.refreshUserFeeds(ctx, did) 510 + s.refreshUserFeeds(ctx, did) 511 + }(did) 500 512 } 501 513 514 + wg.Wait() 502 515 s.logger.Info("collection directory backfill complete") 503 516 } 504 517
+12 -1
main.go
··· 8 8 "net/http" 9 9 "os" 10 10 "os/signal" 11 + "strconv" 11 12 "syscall" 12 13 "time" 13 14 ··· 25 26 syncInterval := flag.Duration("sync-interval", envDuration("GLEAN_SYNC_INTERVAL", 1*time.Hour), "PDS sync interval") 26 27 clusterInterval := flag.Duration("cluster-interval", envDuration("GLEAN_CLUSTER_INTERVAL", 10*time.Minute), "cluster recomputation interval") 27 28 collectionDirURL := flag.String("collection-dir", envOr("GLEAN_COLLECTION_DIR_URL", ""), "collection directory URL for startup backfill") 29 + backfillConcurrency := flag.Int("backfill-concurrency", envInt("GLEAN_BACKFILL_CONCURRENCY", 5), "max concurrent backfill workers") 28 30 flag.Parse() 29 31 30 32 atproto.InitIdentity(envOr("GLEAN_PLC_URL", "https://didplc.glean.at")) ··· 70 72 srv.PeriodicSync(ctx, *syncInterval) 71 73 }() 72 74 go func() { 73 - srv.BackfillFromCollectionDir(ctx, *collectionDirURL) 75 + srv.BackfillFromCollectionDir(ctx, *collectionDirURL, *backfillConcurrency) 74 76 }() 75 77 go func() { 76 78 if err := jetstream.Start(ctx); err != nil && ctx.Err() == nil { ··· 125 127 } 126 128 return fallback 127 129 } 130 + 131 + func envInt(key string, fallback int) int { 132 + if v := os.Getenv(key); v != "" { 133 + if n, err := strconv.Atoi(v); err == nil { 134 + return n 135 + } 136 + } 137 + return fallback 138 + }