this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

pr feedback: .Shudown() instead of Context

authored by

Brian Olson and committed by
Brian Olson
7b183a08 58c1cd21

+23 -14
+2 -4
cmd/bigsky/main.go
··· 401 401 402 402 rf := indexer.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency")) 403 403 404 - ctx, ctxCancel := context.WithCancel(context.Background()) 405 - defer ctxCancel() 406 - 407 - ix, err := indexer.NewIndexer(ctx, db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering")) 404 + ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering")) 408 405 if err != nil { 409 406 return err 410 407 } 408 + defer ix.Shutdown() 411 409 412 410 rlskip := cctx.String("bsky-social-rate-limit-skip") 413 411 ix.ApplyPDSClientSettings = func(c *xrpc.Client) {
+11 -5
indexer/crawler.go
··· 28 28 doRepoCrawl func(context.Context, *crawlWork) error 29 29 30 30 concurrency int 31 + 32 + done chan struct{} 31 33 } 32 34 33 - func NewCrawlDispatcher(ctx context.Context, repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) { 35 + func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) { 34 36 if concurrency < 1 { 35 37 return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") 36 38 } ··· 44 46 concurrency: concurrency, 45 47 todo: make(map[models.Uid]*crawlWork), 46 48 inProgress: make(map[models.Uid]*crawlWork), 49 + done: make(chan struct{}), 47 50 } 48 - go out.CatchupRepoGaugePoller(ctx) 51 + go out.CatchupRepoGaugePoller() 49 52 50 53 return out, nil 51 54 } ··· 56 59 for i := 0; i < c.concurrency; i++ { 57 60 go c.fetchWorker() 58 61 } 62 + } 63 + 64 + func (c *CrawlDispatcher) Shutdown() { 65 + close(c.done) 59 66 } 60 67 61 68 type catchupJob struct { ··· 282 289 return len(c.inProgress) + len(c.todo) 283 290 } 284 291 285 - func (c *CrawlDispatcher) CatchupRepoGaugePoller(ctx context.Context) { 286 - done := ctx.Done() 292 + func (c *CrawlDispatcher) CatchupRepoGaugePoller() { 287 293 ticker := time.NewTicker(30 * time.Second) 288 294 defer ticker.Stop() 289 295 for { 290 296 select { 291 - case <-done: 297 + case <-c.done: 292 298 case <-ticker.C: 293 299 catchupReposGauge.Set(float64(c.countReposInSlowPath())) 294 300 }
+6 -2
indexer/indexer.go
··· 47 47 ApplyPDSClientSettings func(*xrpc.Client) 48 48 } 49 49 50 - func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) { 50 + func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) { 51 51 db.AutoMigrate(&models.FeedPost{}) 52 52 db.AutoMigrate(&models.ActorInfo{}) 53 53 db.AutoMigrate(&models.FollowRecord{}) ··· 68 68 } 69 69 70 70 if crawl { 71 - c, err := NewCrawlDispatcher(ctx, fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency) 71 + c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency) 72 72 if err != nil { 73 73 return nil, err 74 74 } ··· 78 78 } 79 79 80 80 return ix, nil 81 + } 82 + 83 + func (ix *Indexer) Shutdown() { 84 + ix.Crawler.Shutdown() 81 85 } 82 86 83 87 func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error {
+2 -1
indexer/posts_test.go
··· 63 63 64 64 rf := NewRepoFetcher(maindb, repoman, 10) 65 65 66 - ix, err := NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, false, true, true) 66 + ix, err := NewIndexer(maindb, notifman, evtman, didr, rf, false, true, true) 67 67 if err != nil { 68 68 t.Fatal(err) 69 69 } ··· 81 81 if ix.dir != "" { 82 82 _ = os.RemoveAll(ix.dir) 83 83 } 84 + ix.ix.Shutdown() 84 85 } 85 86 86 87 // TODO: dedupe this out into some testing utility package
+1 -1
pds/server.go
··· 79 79 80 80 rf := indexer.NewRepoFetcher(db, repoman, 10) 81 81 82 - ix, err := indexer.NewIndexer(context.Background(), db, notifman, evtman, didr, rf, false, true, true) 82 + ix, err := indexer.NewIndexer(db, notifman, evtman, didr, rf, false, true, true) 83 83 if err != nil { 84 84 return nil, err 85 85 }
+1 -1
testing/utils.go
··· 569 569 evtman := events.NewEventManager(diskpersist) 570 570 rf := indexer.NewRepoFetcher(maindb, repoman, 10) 571 571 572 - ix, err := indexer.NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, true, true, true) 572 + ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, rf, true, true, true) 573 573 if err != nil { 574 574 return nil, err 575 575 }