this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add gauge indexer_catchup_repos; fix bigsky NewIndexer usage

authored by

Brian Olson and committed by
Brian Olson
58c1cd21 bc9dea4d

+40 -9
+4 -1
cmd/bigsky/main.go
··· 401 401 402 402 rf := indexer.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency")) 403 403 404 - ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, cctx.Bool("spidering"), false) 404 + ctx, ctxCancel := context.WithCancel(context.Background()) 405 + defer ctxCancel() 406 + 407 + ix, err := indexer.NewIndexer(ctx, db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering")) 405 408 if err != nil { 406 409 return err 407 410 }
+26 -3
indexer/crawler.go
··· 4 4 "context" 5 5 "fmt" 6 6 "sync" 7 + "time" 7 8 8 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 9 10 "github.com/bluesky-social/indigo/models" ··· 29 30 concurrency int 30 31 } 31 32 32 - func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) { 33 + func NewCrawlDispatcher(ctx context.Context, repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) { 33 34 if concurrency < 1 { 34 35 return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") 35 36 } 36 37 37 - return &CrawlDispatcher{ 38 + out := &CrawlDispatcher{ 38 39 ingest: make(chan *models.ActorInfo), 39 40 repoSync: make(chan *crawlWork), 40 41 complete: make(chan models.Uid), ··· 43 44 concurrency: concurrency, 44 45 todo: make(map[models.Uid]*crawlWork), 45 46 inProgress: make(map[models.Uid]*crawlWork), 46 - }, nil 47 + } 48 + go out.CatchupRepoGaugePoller(ctx) 49 + 50 + return out, nil 47 51 } 48 52 49 53 func (c *CrawlDispatcher) Run() { ··· 271 275 272 276 return false 273 277 } 278 + 279 + func (c *CrawlDispatcher) countReposInSlowPath() int { 280 + c.maplk.Lock() 281 + defer c.maplk.Unlock() 282 + return len(c.inProgress) + len(c.todo) 283 + } 284 + 285 + func (c *CrawlDispatcher) CatchupRepoGaugePoller(ctx context.Context) { 286 + done := ctx.Done() 287 + ticker := time.NewTicker(30 * time.Second) 288 + defer ticker.Stop() 289 + for { 290 + select { 291 + case <-done: 292 + case <-ticker.C: 293 + catchupReposGauge.Set(float64(c.countReposInSlowPath())) 294 + } 295 + } 296 + }
+2 -2
indexer/indexer.go
··· 47 47 ApplyPDSClientSettings func(*xrpc.Client) 48 48 } 49 49 50 - func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) { 50 + func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) { 51 51 db.AutoMigrate(&models.FeedPost{}) 52 52 db.AutoMigrate(&models.ActorInfo{}) 53 53 db.AutoMigrate(&models.FollowRecord{}) ··· 68 68 } 69 69 70 70 if crawl { 71 - c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency) 71 + c, err := NewCrawlDispatcher(ctx, fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency) 72 72 if err != nil { 73 73 return nil, err 74 74 }
+5
indexer/metrics.go
··· 39 39 Name: "indexer_catchup_events_failed", 40 40 Help: "Number of catchup events processed", 41 41 }, []string{"err"}) 42 + 43 + var catchupReposGauge = promauto.NewGauge(prometheus.GaugeOpts{ 44 + Name: "indexer_catchup_repos", 45 + Help: "Number of repos waiting on catchup", 46 + })
+1 -1
indexer/posts_test.go
··· 63 63 64 64 rf := NewRepoFetcher(maindb, repoman, 10) 65 65 66 - ix, err := NewIndexer(maindb, notifman, evtman, didr, rf, false, true, true) 66 + ix, err := NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, false, true, true) 67 67 if err != nil { 68 68 t.Fatal(err) 69 69 }
+1 -1
pds/server.go
··· 79 79 80 80 rf := indexer.NewRepoFetcher(db, repoman, 10) 81 81 82 - ix, err := indexer.NewIndexer(db, notifman, evtman, didr, rf, false, true, true) 82 + ix, err := indexer.NewIndexer(context.Background(), db, notifman, evtman, didr, rf, false, true, true) 83 83 if err != nil { 84 84 return nil, err 85 85 }
+1 -1
testing/utils.go
··· 569 569 evtman := events.NewEventManager(diskpersist) 570 570 rf := indexer.NewRepoFetcher(maindb, repoman, 10) 571 571 572 - ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, rf, true, true, true) 572 + ix, err := indexer.NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, true, true, true) 573 573 if err != nil { 574 574 return nil, err 575 575 }