···1919 "github.com/bluesky-social/indigo/models"
2020 "github.com/bluesky-social/indigo/repomgr"
2121 "github.com/bluesky-social/indigo/util/svcutil"
2222- "github.com/bluesky-social/indigo/xrpc"
2322 lru "github.com/hashicorp/golang-lru/v2"
2423 ipld "github.com/ipfs/go-ipld-format"
2524 "github.com/labstack/echo/v4"
···3433var tracer = otel.Tracer("archiver")
35343635type Archiver struct {
3737- Index *Indexer
3836 db *gorm.DB
3937 slurper *bgs.Slurper
4038 didr did.Resolver
3939+4040+ crawler *CrawlDispatcher
41414242 // TODO: work on doing away with this flag in favor of more pluggable
4343 // pieces that abstract the need for explicit ssl checks
···6060 pdsResyncsLk sync.RWMutex
6161 pdsResyncs map[uint]*bgs.PDSResync
62626363- // Management of Compaction
6464- compactor *bgs.Compactor
6565-6663 // User cache
6764 userCache *lru.Cache[string, *User]
68656969- // nextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
7070- nextCrawlers []*url.URL
7171- httpClient http.Client
6666+ httpClient http.Client
72677368 log *slog.Logger
7469}
···9691 }
9792}
98939999-func NewArchiver(db *gorm.DB, ix *Indexer, repoman *repomgr.RepoManager, didr did.Resolver, rf *RepoFetcher, config *ArchiverConfig) (*Archiver, error) {
9494+func NewArchiver(db *gorm.DB, repoman *repomgr.RepoManager, didr did.Resolver, rf *RepoFetcher, config *ArchiverConfig) (*Archiver, error) {
10095 if config == nil {
10196 config = DefaultArchiverConfig()
10297 }
···107102108103 uc, _ := lru.New[string, *User](1_000_000)
109104105105+ log := slog.Default().With("system", "archiver")
106106+107107+ c, err := NewCrawlDispatcher(rf, rf.MaxConcurrency, log)
108108+ if err != nil {
109109+ return nil, err
110110+ }
111111+112112+ c.Run()
113113+110114 arc := &Archiver{
111111- Index: ix,
112112- db: db,
115115+ crawler: c,
116116+ db: db,
113117114118 repoman: repoman,
115119 didr: didr,
···122126123127 userCache: uc,
124128125125- log: slog.Default().With("system", "archiver"),
129129+ log: log,
126130 }
127131128128- ix.CreateExternalUser = arc.handleUserUpdate
129132 slOpts := bgs.DefaultSlurperOptions()
130133 slOpts.SSL = config.SSL
131134 slOpts.DefaultRepoLimit = config.DefaultRepoLimit
···142145 return nil, err
143146 }
144147145145- cOpts := bgs.DefaultCompactorOptions()
146146- cOpts.NumWorkers = config.NumCompactionWorkers
147147- compactor := bgs.NewCompactor(cOpts)
148148- compactor.RequeueInterval = config.CompactInterval
149149- // TODO: compactor shenanigans
150150- //compactor.Start(arc)
151151- arc.compactor = compactor
152152-153153- arc.nextCrawlers = config.NextCrawlers
154148 arc.httpClient.Timeout = time.Second * 5
155149156150 return arc, nil
···163157 Did string `gorm:"uniqueindex"`
164158 PDS uint
165159166166- // TakenDown is set to true if the user in question has been taken down.
167167- // A user in this state will have all future events related to it dropped
168168- // and no data about this user will be served.
169169- TakenDown bool
170170- Tombstoned bool
171171-172160 // UpstreamStatus is the state of the user as reported by the upstream PDS
173161 UpstreamStatus string `gorm:"index"`
174162175163 lk sync.Mutex
176164}
177165178178-func (u *User) SetTakenDown(v bool) {
179179- u.lk.Lock()
180180- defer u.lk.Unlock()
181181- u.TakenDown = v
182182-}
183183-184184-func (u *User) GetTakenDown() bool {
185185- u.lk.Lock()
186186- defer u.lk.Unlock()
187187- return u.TakenDown
188188-}
189189-190190-func (u *User) SetTombstoned(v bool) {
191191- u.lk.Lock()
192192- defer u.lk.Unlock()
193193- u.Tombstoned = v
194194-}
195195-196196-func (u *User) GetTombstoned() bool {
197197- u.lk.Lock()
198198- defer u.lk.Unlock()
199199- return u.Tombstoned
200200-}
201201-202166func (u *User) SetUpstreamStatus(v string) {
203167 u.lk.Lock()
204168 defer u.lk.Unlock()
···220184 s.log.Debug("create external user", "did", did)
221185 doc, err := s.didr.GetDocument(ctx, did)
222186 if err != nil {
223223- return nil, fmt.Errorf("could not locate DID document for followed user (%s): %w", did, err)
187187+ return nil, fmt.Errorf("could not locate DID document for user (%s): %w", did, err)
224188 }
225189226190 if len(doc.Service) == 0 {
227227- return nil, fmt.Errorf("external followed user %s had no services in did document", did)
191191+ return nil, fmt.Errorf("user %s had no services in did document", did)
228192 }
229193230194 svc := doc.Service[0]
···244208 return nil, err
245209 }
246210247247- c := &xrpc.Client{Host: durl.String()}
248248- s.Index.ApplyPDSClientSettings(c)
249249-250211 if peering.ID == 0 {
251212 peering.Host = durl.Host
252213 peering.SSL = (durl.Scheme == "https")
···307268 s.extUserLk.Lock()
308269 defer s.extUserLk.Unlock()
309270310310- exu, err := s.Index.LookupUserByDid(ctx, did)
271271+ exu, err := s.LookupUserByDid(ctx, did)
311272 if err == nil {
312273 s.log.Debug("lost the race to create a new user", "did", did)
313274 if exu.PDS != peering.ID {
···361322 u, err := s.handleUserUpdate(ctx, evt.Repo)
362323 userLookupDuration.Observe(time.Since(st).Seconds())
363324 if err != nil {
364364- if !errors.Is(err, gorm.ErrRecordNotFound) {
365365- repoCommitsResultCounter.WithLabelValues(host.Host, "nou").Inc()
366366- return fmt.Errorf("looking up event user: %w", err)
367367- }
368368-369369- newUsersDiscovered.Inc()
370370- start := time.Now()
371371- subj, err := s.handleUserUpdate(ctx, evt.Repo)
372372- newUserDiscoveryDuration.Observe(time.Since(start).Seconds())
373373- if err != nil {
374374- repoCommitsResultCounter.WithLabelValues(host.Host, "uerr").Inc()
375375- return fmt.Errorf("fed event create external user: %w", err)
376376- }
377377-378378- u = subj
325325+ return fmt.Errorf("looking up event user: %w", err)
379326 }
380327381328 ustatus := u.GetUpstreamStatus()
382329 span.SetAttributes(attribute.String("upstream_status", ustatus))
383330384384- if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
385385- span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
331331+ switch ustatus {
332332+ case events.AccountStatusTakendown:
386333 s.log.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
387334 repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc()
388335 return nil
389389- }
390390-391391- if ustatus == events.AccountStatusSuspended {
336336+ case events.AccountStatusSuspended:
392337 s.log.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
393338 repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc()
394339 return nil
395395- }
396396-397397- if ustatus == events.AccountStatusDeactivated {
340340+ case events.AccountStatusDeactivated:
398341 s.log.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
399342 repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc()
400343 return nil
···405348 return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
406349 }
407350408408- if host.ID != u.PDS && u.PDS != 0 {
351351+ if host.ID != u.PDS && u.PDS != 0 && !host.Trusted {
409352 s.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
410353 // Flush any cached DID documents for this user
411354 s.didr.FlushCacheFor(env.RepoCommit.Repo)
···422365 }
423366 }
424367425425- if u.GetTombstoned() {
426426- span.SetAttributes(attribute.Bool("tombstoned", true))
427427- // we've checked the authority of the users PDS, so reinstate the account
428428- if err := s.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
429429- repoCommitsResultCounter.WithLabelValues(host.Host, "tomb").Inc()
430430- return fmt.Errorf("failed to un-tombstone a user: %w", err)
431431- }
432432- u.SetTombstoned(false)
433433-434434- ai, err := s.Index.LookupUser(ctx, u.ID)
435435- if err != nil {
436436- repoCommitsResultCounter.WithLabelValues(host.Host, "nou2").Inc()
437437- return fmt.Errorf("failed to look up user (tombstone recover): %w", err)
438438- }
439439-440440- // Now a simple re-crawl should suffice to bring the user back online
441441- repoCommitsResultCounter.WithLabelValues(host.Host, "catchupt").Inc()
442442- return s.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
443443- }
444444-445368 // skip the fast path for rebases or if the user is already in the slow path
446446- if s.Index.Crawler.RepoInSlowPath(ctx, u.ID) {
447447- ai, err := s.Index.LookupUser(ctx, u.ID)
369369+ if s.crawler.RepoInSlowPath(ctx, u.ID) {
370370+ ai, err := s.LookupUser(ctx, u.ID)
448371 if err != nil {
449372 repoCommitsResultCounter.WithLabelValues(host.Host, "nou3").Inc()
450373 return fmt.Errorf("failed to look up user (slow path): %w", err)
···459382 // whether or not we even need this 'slow path' logic, as it makes
460383 // accounting for which events have been processed much harder
461384 repoCommitsResultCounter.WithLabelValues(host.Host, "catchup").Inc()
462462- return s.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
385385+ return s.crawler.AddToCatchupQueue(ctx, host, ai, evt)
463386 }
464387465388 if err := s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil {
466389467390 if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) {
468468- ai, lerr := s.Index.LookupUser(ctx, u.ID)
391391+ ai, lerr := s.LookupUser(ctx, u.ID)
469392 if lerr != nil {
470393 log.Warn("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
471394 repoCommitsResultCounter.WithLabelValues(host.Host, "nou4").Inc()
···476399477400 log.Info("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
478401 repoCommitsResultCounter.WithLabelValues(host.Host, "catchup2").Inc()
479479- return s.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
402402+ return s.crawler.AddToCatchupQueue(ctx, host, ai, evt)
480403 }
481404482405 log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
···487410 repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc()
488411 return nil
489412 case env.RepoIdentity != nil:
490490- s.log.Info("bgs got identity event", "did", env.RepoIdentity.Did)
413413+ s.log.Info("archiver got identity event", "did", env.RepoIdentity.Did)
491414 // Flush any cached DID documents for this user
492415 s.didr.FlushCacheFor(env.RepoIdentity.Did)
493416···509432 span.SetAttributes(attribute.String("repo_status", *env.RepoAccount.Status))
510433 }
511434512512- s.log.Info("bgs got account event", "did", env.RepoAccount.Did)
435435+ s.log.Info("archiver got account event", "did", env.RepoAccount.Did)
513436 // Flush any cached DID documents for this user
514437 s.didr.FlushCacheFor(env.RepoAccount.Did)
515438···823746func (s *Archiver) Shutdown() []error {
824747 errs := s.slurper.Shutdown()
825748826826- s.compactor.Shutdown()
749749+ if s.crawler != nil {
750750+ s.crawler.Shutdown()
751751+ }
827752828753 return errs
829754}
+9-10
archiver/crawler.go
···199199 c.maplk.Lock()
200200 defer c.maplk.Unlock()
201201202202- // If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl
202202+ // If the actor crawl is enqueued, we can ignore this event as it will
203203+ // be included in the repo when we fetch it
203204 job, ok := c.todo[catchup.user.ID]
204205 if ok {
205206 catchupEventsEnqueued.WithLabelValues("todo").Inc()
···221222 act: catchup.user,
222223 catchup: []*catchupJob{catchup},
223224 }
225225+224226 c.todo[catchup.user.ID] = cw
225227 return cw
226228}
227229228230func (c *CrawlDispatcher) fetchWorker() {
229229- for {
230230- select {
231231- case job := <-c.repoSync:
232232- if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
233233- c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
234234- }
235235-236236- // TODO: do we still just do this if it errors?
237237- c.complete <- job.act.ID
231231+ for job := range c.repoSync {
232232+ if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
233233+ c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
238234 }
235235+236236+ // TODO: do we still just do this if it errors?
237237+ c.complete <- job.act.ID
239238 }
240239}
241240
+7-18
archiver/handlers.go
···3636 return echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
3737 }
38383939- if u.GetTombstoned() {
4040- return fmt.Errorf("account was deleted")
4141- }
4242-4343- if u.GetTakenDown() {
4444- return fmt.Errorf("account was taken down by the Relay")
4545- }
4646-4739 ustatus := u.GetUpstreamStatus()
4848- if ustatus == events.AccountStatusTakendown {
4040+ switch ustatus {
4141+ case events.AccountStatusTakendown:
4942 return fmt.Errorf("account was taken down by its PDS")
5050- }
5151-5252- if ustatus == events.AccountStatusDeactivated {
4343+ case events.AccountStatusDeactivated:
5344 return fmt.Errorf("account is temporarily deactivated")
5454- }
5555-5656- if ustatus == events.AccountStatusSuspended {
4545+ case events.AccountStatusSuspended:
5746 return fmt.Errorf("account is suspended by its PDS")
5847 }
5948···7463 if err := s.db.Exec("SELECT 1").Error; err != nil {
7564 s.log.Error("healthcheck can't connect to database", "err", err)
7665 return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"})
7777- } else {
7878- return c.JSON(200, HealthStatus{Status: "ok"})
7966 }
6767+6868+ return c.JSON(200, HealthStatus{Status: "ok"})
8069}
81708282-var homeMessage string = `
7171+var homeMessage = `
8372[ insert fancy archiver art here ]
8473`
8574
+16-78
archiver/loader.go
···44 "context"
55 "errors"
66 "fmt"
77- "log/slog"
8799- "github.com/bluesky-social/indigo/did"
1010- "github.com/bluesky-social/indigo/events"
118 "github.com/bluesky-social/indigo/models"
1212- "github.com/bluesky-social/indigo/xrpc"
1391410 "go.opentelemetry.io/otel"
1511 "gorm.io/gorm"
1612)
17131818-const MaxEventSliceLength = 1000000
1919-const MaxOpsSliceLength = 200
2020-2121-type Indexer struct {
2222- db *gorm.DB
2323-2424- didr did.Resolver
2525-2626- Crawler *CrawlDispatcher
2727-2828- CreateExternalUser func(context.Context, string) (*User, error)
2929- ApplyPDSClientSettings func(*xrpc.Client)
3030-3131- log *slog.Logger
3232-}
3333-3434-type AddEventFunc func(ctx context.Context, ev *events.XRPCStreamEvent) error
3535-3636-func NewIndexer(db *gorm.DB, didr did.Resolver, fetcher *RepoFetcher, crawl bool) (*Indexer, error) {
3737- ix := &Indexer{
3838- db: db,
3939- didr: didr,
4040- ApplyPDSClientSettings: func(*xrpc.Client) {},
4141- log: slog.Default().With("system", "indexer"),
4242- }
4343-4444- if crawl {
4545- c, err := NewCrawlDispatcher(fetcher, fetcher.MaxConcurrency, ix.log)
4646- if err != nil {
4747- return nil, err
4848- }
4949-5050- ix.Crawler = c
5151- ix.Crawler.Run()
5252- }
5353-5454- return ix, nil
5555-}
5656-5757-func (ix *Indexer) Shutdown() {
5858- if ix.Crawler != nil {
5959- ix.Crawler.Shutdown()
6060- }
6161-}
6262-6363-func (ix *Indexer) GetUserOrMissing(ctx context.Context, did string) (*User, error) {
1414+func (s *Archiver) GetUserOrMissing(ctx context.Context, did string) (*User, error) {
6415 ctx, span := otel.Tracer("indexer").Start(ctx, "getUserOrMissing")
6516 defer span.End()
66176767- ai, err := ix.LookupUserByDid(ctx, did)
1818+ ai, err := s.LookupUserByDid(ctx, did)
6819 if err == nil {
6920 return ai, nil
7021 }
···7425 }
75267627 // unknown user... create it and send it off to the crawler
7777- return ix.createMissingUserRecord(ctx, did)
2828+ return s.createMissingUserRecord(ctx, did)
7829}
79308080-func (ix *Indexer) createMissingUserRecord(ctx context.Context, did string) (*User, error) {
3131+func (s *Archiver) createMissingUserRecord(ctx context.Context, did string) (*User, error) {
8132 ctx, span := otel.Tracer("indexer").Start(ctx, "createMissingUserRecord")
8233 defer span.End()
83348435 externalUserCreationAttempts.Inc()
85368686- ai, err := ix.CreateExternalUser(ctx, did)
3737+ ai, err := s.handleUserUpdate(ctx, did)
8738 if err != nil {
8839 return nil, err
8940 }
90419191- if err := ix.addUserToCrawler(ctx, ai); err != nil {
4242+ if err := s.addUserToCrawler(ctx, ai); err != nil {
9243 return nil, fmt.Errorf("failed to add unknown user to crawler: %w", err)
9344 }
94459546 return ai, nil
9647}
97489898-func (ix *Indexer) addUserToCrawler(ctx context.Context, ai *User) error {
9999- ix.log.Debug("Sending user to crawler: ", "did", ai.Did)
100100- if ix.Crawler == nil {
4949+func (s *Archiver) addUserToCrawler(ctx context.Context, ai *User) error {
5050+ s.log.Debug("Sending user to crawler: ", "did", ai.Did)
5151+ if s.crawler == nil {
10152 return nil
10253 }
10354104104- return ix.Crawler.Crawl(ctx, ai)
5555+ return s.crawler.Crawl(ctx, ai)
10556}
10657107107-func (ix *Indexer) DidForUser(ctx context.Context, uid models.Uid) (string, error) {
5858+func (s *Archiver) DidForUser(ctx context.Context, uid models.Uid) (string, error) {
10859 var ai User
109109- if err := ix.db.First(&ai, "uid = ?", uid).Error; err != nil {
6060+ if err := s.db.First(&ai, "id = ?", uid).Error; err != nil {
11061 return "", err
11162 }
1126311364 return ai.Did, nil
11465}
11566116116-func (ix *Indexer) LookupUser(ctx context.Context, id models.Uid) (*User, error) {
6767+func (s *Archiver) LookupUser(ctx context.Context, id models.Uid) (*User, error) {
11768 var ai User
118118- if err := ix.db.First(&ai, "uid = ?", id).Error; err != nil {
6969+ if err := s.db.First(&ai, "id = ?", id).Error; err != nil {
11970 return nil, err
12071 }
1217212273 return &ai, nil
12374}
12475125125-func (ix *Indexer) LookupUserByDid(ctx context.Context, did string) (*User, error) {
7676+func (s *Archiver) LookupUserByDid(ctx context.Context, did string) (*User, error) {
12677 var ai User
127127- if err := ix.db.Find(&ai, "did = ?", did).Error; err != nil {
128128- return nil, err
129129- }
130130-131131- if ai.ID == 0 {
132132- return nil, gorm.ErrRecordNotFound
133133- }
134134-135135- return &ai, nil
136136-}
137137-138138-func (ix *Indexer) LookupUserByHandle(ctx context.Context, handle string) (*User, error) {
139139- var ai User
140140- if err := ix.db.Find(&ai, "handle = ?", handle).Error; err != nil {
7878+ if err := s.db.Find(&ai, "did = ?", did).Error; err != nil {
14179 return nil, err
14280 }
14381