···11+package archiver
22+33+import (
44+ "context"
55+ "database/sql"
66+ "errors"
77+ "fmt"
88+ "log/slog"
99+ "net"
1010+ "net/http"
1111+ "net/url"
1212+ "strings"
1313+ "sync"
1414+ "time"
1515+1616+ "github.com/bluesky-social/indigo/api/atproto"
1717+ "github.com/bluesky-social/indigo/bgs"
1818+ "github.com/bluesky-social/indigo/carstore"
1919+ "github.com/bluesky-social/indigo/did"
2020+ "github.com/bluesky-social/indigo/events"
2121+ "github.com/bluesky-social/indigo/handles"
2222+ "github.com/bluesky-social/indigo/models"
2323+ "github.com/bluesky-social/indigo/repomgr"
2424+ "github.com/bluesky-social/indigo/util/svcutil"
2525+ "github.com/bluesky-social/indigo/xrpc"
2626+ lru "github.com/hashicorp/golang-lru/v2"
2727+ ipld "github.com/ipfs/go-ipld-format"
2828+ "github.com/labstack/echo/v4"
2929+ "github.com/labstack/echo/v4/middleware"
3030+ "github.com/labstack/gommon/log"
3131+ "github.com/prometheus/client_golang/prometheus/promhttp"
3232+ "go.opentelemetry.io/otel"
3333+ "go.opentelemetry.io/otel/attribute"
3434+ "gorm.io/gorm"
3535+)
3636+3737+var tracer = otel.Tracer("archiver")
3838+3939+type Archiver struct {
4040+ Index *Indexer
4141+ db *gorm.DB
4242+ slurper *bgs.Slurper
4343+ didr did.Resolver
4444+4545+ hr handles.HandleResolver
4646+4747+ // TODO: work on doing away with this flag in favor of more pluggable
4848+ // pieces that abstract the need for explicit ssl checks
4949+ ssl bool
5050+5151+ crawlOnly bool
5252+5353+ // TODO: at some point we will want to lock specific DIDs, this lock as is
5454+ // is overly broad, but i dont expect it to be a bottleneck for now
5555+ extUserLk sync.Mutex
5656+5757+ repoman *repomgr.RepoManager
5858+5959+ // Management of Socket Consumers
6060+ consumersLk sync.RWMutex
6161+ nextConsumerID uint64
6262+ consumers map[uint64]*bgs.SocketConsumer
6363+6464+ // Management of Resyncs
6565+ pdsResyncsLk sync.RWMutex
6666+ pdsResyncs map[uint]*bgs.PDSResync
6767+6868+ // Management of Compaction
6969+ compactor *bgs.Compactor
7070+7171+ // User cache
7272+ userCache *lru.Cache[string, *User]
7373+7474+ // nextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
7575+ nextCrawlers []*url.URL
7676+ httpClient http.Client
7777+7878+ log *slog.Logger
7979+}
8080+8181+type ArchiverConfig struct {
8282+ SSL bool
8383+ CompactInterval time.Duration
8484+ DefaultRepoLimit int64
8585+ ConcurrencyPerPDS int64
8686+ MaxQueuePerPDS int64
8787+ NumCompactionWorkers int
8888+8989+ // NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
9090+ NextCrawlers []*url.URL
9191+}
9292+9393+func DefaultArchiverConfig() *ArchiverConfig {
9494+ return &ArchiverConfig{
9595+ SSL: true,
9696+ CompactInterval: 4 * time.Hour,
9797+ DefaultRepoLimit: 100,
9898+ ConcurrencyPerPDS: 100,
9999+ MaxQueuePerPDS: 1_000,
100100+ NumCompactionWorkers: 2,
101101+ }
102102+}
103103+104104+func NewArchiver(db *gorm.DB, ix *Indexer, repoman *repomgr.RepoManager, didr did.Resolver, rf *RepoFetcher, hr handles.HandleResolver, config *ArchiverConfig) (*Archiver, error) {
105105+ if config == nil {
106106+ config = DefaultArchiverConfig()
107107+ }
108108+ db.AutoMigrate(User{})
109109+ db.AutoMigrate(AuthToken{})
110110+ db.AutoMigrate(models.PDS{})
111111+ db.AutoMigrate(models.DomainBan{})
112112+113113+ uc, _ := lru.New[string, *User](1_000_000)
114114+115115+ arc := &Archiver{
116116+ Index: ix,
117117+ db: db,
118118+119119+ hr: hr,
120120+ repoman: repoman,
121121+ didr: didr,
122122+ ssl: config.SSL,
123123+124124+ consumersLk: sync.RWMutex{},
125125+ consumers: make(map[uint64]*bgs.SocketConsumer),
126126+127127+ pdsResyncs: make(map[uint]*bgs.PDSResync),
128128+129129+ userCache: uc,
130130+131131+ log: slog.Default().With("system", "archiver"),
132132+ }
133133+134134+ ix.CreateExternalUser = arc.handleUserUpdate
135135+ slOpts := bgs.DefaultSlurperOptions()
136136+ slOpts.SSL = config.SSL
137137+ slOpts.DefaultRepoLimit = config.DefaultRepoLimit
138138+ slOpts.ConcurrencyPerPDS = config.ConcurrencyPerPDS
139139+ slOpts.MaxQueuePerPDS = config.MaxQueuePerPDS
140140+ s, err := bgs.NewSlurper(db, arc.handleFedEvent, slOpts)
141141+ if err != nil {
142142+ return nil, err
143143+ }
144144+145145+ arc.slurper = s
146146+147147+ if err := arc.slurper.RestartAll(); err != nil {
148148+ return nil, err
149149+ }
150150+151151+ cOpts := bgs.DefaultCompactorOptions()
152152+ cOpts.NumWorkers = config.NumCompactionWorkers
153153+ compactor := bgs.NewCompactor(cOpts)
154154+ compactor.RequeueInterval = config.CompactInterval
155155+ // TODO: compactor shenanigans
156156+ //compactor.Start(arc)
157157+ arc.compactor = compactor
158158+159159+ arc.nextCrawlers = config.NextCrawlers
160160+ arc.httpClient.Timeout = time.Second * 5
161161+162162+ return arc, nil
163163+164164+}
165165+166166+type User struct {
167167+ gorm.Model
168168+ ID models.Uid `gorm:"primarykey;index:idx_user_id_active,where:taken_down = false AND tombstoned = false"`
169169+ Handle sql.NullString `gorm:"index"`
170170+ DisplayName string
171171+ Did string `gorm:"uniqueindex"`
172172+ Following int64
173173+ Followers int64
174174+ Posts int64
175175+ Type string
176176+ PDS uint
177177+ ValidHandle bool `gorm:"default:true"`
178178+179179+ // TakenDown is set to true if the user in question has been taken down.
180180+ // A user in this state will have all future events related to it dropped
181181+ // and no data about this user will be served.
182182+ TakenDown bool
183183+ Tombstoned bool
184184+185185+ // UpstreamStatus is the state of the user as reported by the upstream PDS
186186+ UpstreamStatus string `gorm:"index"`
187187+188188+ lk sync.Mutex
189189+}
190190+191191+func (u *User) SetTakenDown(v bool) {
192192+ u.lk.Lock()
193193+ defer u.lk.Unlock()
194194+ u.TakenDown = v
195195+}
196196+197197+func (u *User) GetTakenDown() bool {
198198+ u.lk.Lock()
199199+ defer u.lk.Unlock()
200200+ return u.TakenDown
201201+}
202202+203203+func (u *User) SetTombstoned(v bool) {
204204+ u.lk.Lock()
205205+ defer u.lk.Unlock()
206206+ u.Tombstoned = v
207207+}
208208+209209+func (u *User) GetTombstoned() bool {
210210+ u.lk.Lock()
211211+ defer u.lk.Unlock()
212212+ return u.Tombstoned
213213+}
214214+215215+func (u *User) SetUpstreamStatus(v string) {
216216+ u.lk.Lock()
217217+ defer u.lk.Unlock()
218218+ u.UpstreamStatus = v
219219+}
220220+221221+func (u *User) GetUpstreamStatus() string {
222222+ u.lk.Lock()
223223+ defer u.lk.Unlock()
224224+ return u.UpstreamStatus
225225+}
226226+227227+func (s *Archiver) handleUserUpdate(ctx context.Context, did string) (*User, error) {
228228+ ctx, span := tracer.Start(ctx, "handleUserUpdate")
229229+ defer span.End()
230230+231231+ externalUserCreationAttempts.Inc()
232232+233233+ s.log.Debug("create external user", "did", did)
234234+ doc, err := s.didr.GetDocument(ctx, did)
235235+ if err != nil {
236236+ return nil, fmt.Errorf("could not locate DID document for followed user (%s): %w", did, err)
237237+ }
238238+239239+ if len(doc.Service) == 0 {
240240+ return nil, fmt.Errorf("external followed user %s had no services in did document", did)
241241+ }
242242+243243+ svc := doc.Service[0]
244244+ durl, err := url.Parse(svc.ServiceEndpoint)
245245+ if err != nil {
246246+ return nil, err
247247+ }
248248+249249+ if strings.HasPrefix(durl.Host, "localhost:") {
250250+ durl.Scheme = "http"
251251+ }
252252+253253+ // TODO: the PDS's DID should also be in the service, we could use that to look up?
254254+ var peering models.PDS
255255+ if err := s.db.Find(&peering, "host = ?", durl.Host).Error; err != nil {
256256+ s.log.Error("failed to find pds", "host", durl.Host)
257257+ return nil, err
258258+ }
259259+260260+ /*
261261+ // TODO: ignore this because we're just gonna get the stream from the relay anyways
262262+ ban, err := s.domainIsBanned(ctx, durl.Host)
263263+ if err != nil {
264264+ return nil, fmt.Errorf("failed to check pds ban status: %w", err)
265265+ }
266266+267267+ if ban {
268268+ return nil, fmt.Errorf("cannot create user on pds with banned domain")
269269+ }
270270+ */
271271+272272+ c := &xrpc.Client{Host: durl.String()}
273273+ s.Index.ApplyPDSClientSettings(c)
274274+275275+ if peering.ID == 0 {
276276+ // TODO: the case of handling a new user on a new PDS probably requires more thought
277277+ cfg, err := atproto.ServerDescribeServer(ctx, c)
278278+ if err != nil {
279279+ // TODO: failing this shouldn't halt our indexing
280280+ return nil, fmt.Errorf("failed to check unrecognized pds: %w", err)
281281+ }
282282+283283+ // since handles can be anything, checking against this list doesn't matter...
284284+ _ = cfg
285285+286286+ // TODO: could check other things, a valid response is good enough for now
287287+ peering.Host = durl.Host
288288+ peering.SSL = (durl.Scheme == "https")
289289+ peering.CrawlRateLimit = float64(s.slurper.DefaultCrawlLimit)
290290+ peering.RateLimit = float64(s.slurper.DefaultPerSecondLimit)
291291+ peering.HourlyEventLimit = s.slurper.DefaultPerHourLimit
292292+ peering.DailyEventLimit = s.slurper.DefaultPerDayLimit
293293+ peering.RepoLimit = s.slurper.DefaultRepoLimit
294294+295295+ if s.ssl && !peering.SSL {
296296+ return nil, fmt.Errorf("did references non-ssl PDS, this is disallowed in prod: %q %q", did, svc.ServiceEndpoint)
297297+ }
298298+299299+ if err := s.db.Create(&peering).Error; err != nil {
300300+ return nil, err
301301+ }
302302+ }
303303+304304+ if peering.ID == 0 {
305305+ panic("somehow failed to create a pds entry?")
306306+ }
307307+308308+ if peering.Blocked {
309309+ return nil, fmt.Errorf("refusing to create user with blocked PDS")
310310+ }
311311+312312+ if peering.RepoCount >= peering.RepoLimit {
313313+ return nil, fmt.Errorf("refusing to create user on PDS at max repo limit for pds %q", peering.Host)
314314+ }
315315+316316+ // Increment the repo count for the PDS
317317+ res := s.db.Model(&models.PDS{}).Where("id = ? AND repo_count < repo_limit", peering.ID).Update("repo_count", gorm.Expr("repo_count + 1"))
318318+ if res.Error != nil {
319319+ return nil, fmt.Errorf("failed to increment repo count for pds %q: %w", peering.Host, res.Error)
320320+ }
321321+322322+ if res.RowsAffected == 0 {
323323+ return nil, fmt.Errorf("refusing to create user on PDS at max repo limit for pds %q", peering.Host)
324324+ }
325325+326326+ successfullyCreated := false
327327+328328+ // Release the count if we fail to create the user
329329+ defer func() {
330330+ if !successfullyCreated {
331331+ if err := s.db.Model(&models.PDS{}).Where("id = ?", peering.ID).Update("repo_count", gorm.Expr("repo_count - 1")).Error; err != nil {
332332+ s.log.Error("failed to decrement repo count for pds", "err", err)
333333+ }
334334+ }
335335+ }()
336336+337337+ if len(doc.AlsoKnownAs) == 0 {
338338+ return nil, fmt.Errorf("user has no 'known as' field in their DID document")
339339+ }
340340+341341+ hurl, err := url.Parse(doc.AlsoKnownAs[0])
342342+ if err != nil {
343343+ return nil, err
344344+ }
345345+346346+ s.log.Debug("creating external user", "did", did, "handle", hurl.Host, "pds", peering.ID)
347347+348348+ handle := hurl.Host
349349+350350+ validHandle := true
351351+352352+ resdid, err := s.hr.ResolveHandleToDid(ctx, handle)
353353+ if err != nil {
354354+ s.log.Error("failed to resolve users claimed handle on pds", "handle", handle, "err", err)
355355+ validHandle = false
356356+ }
357357+358358+ if resdid != did {
359359+ s.log.Error("claimed handle did not match servers response", "resdid", resdid, "did", did)
360360+ validHandle = false
361361+ }
362362+363363+ s.extUserLk.Lock()
364364+ defer s.extUserLk.Unlock()
365365+366366+ exu, err := s.Index.LookupUserByDid(ctx, did)
367367+ if err == nil {
368368+ s.log.Debug("lost the race to create a new user", "did", did, "handle", handle, "existing_hand", exu.Handle)
369369+ if exu.PDS != peering.ID {
370370+ // User is now on a different PDS, update
371371+ if err := s.db.Model(User{}).Where("id = ?", exu.ID).Update("pds", peering.ID).Error; err != nil {
372372+ return nil, fmt.Errorf("failed to update users pds: %w", err)
373373+ }
374374+375375+ exu.PDS = peering.ID
376376+ }
377377+378378+ if exu.Handle.String != handle {
379379+ // Users handle has changed, update
380380+ if err := s.db.Model(User{}).Where("id = ?", exu.ID).Update("handle", handle).Error; err != nil {
381381+ return nil, fmt.Errorf("failed to update users handle: %w", err)
382382+ }
383383+384384+ exu.Handle = sql.NullString{String: handle, Valid: true}
385385+ }
386386+ return exu, nil
387387+ }
388388+389389+ if !errors.Is(err, gorm.ErrRecordNotFound) {
390390+ return nil, err
391391+ }
392392+393393+ // TODO: request this users info from their server to fill out our data...
394394+ u := User{
395395+ Did: did,
396396+ PDS: peering.ID,
397397+ ValidHandle: validHandle,
398398+ }
399399+ if validHandle {
400400+ u.Handle = sql.NullString{String: handle, Valid: true}
401401+ }
402402+403403+ if err := s.db.Create(&u).Error; err != nil {
404404+ // If the new user's handle conflicts with an existing user,
405405+ // since we just validated the handle for this user, we'll assume
406406+ // the existing user no longer has control of the handle
407407+ if errors.Is(err, gorm.ErrDuplicatedKey) {
408408+ // Get the UID of the existing user
409409+ var existingUser User
410410+ if err := s.db.Find(&existingUser, "handle = ?", handle).Error; err != nil {
411411+ return nil, fmt.Errorf("failed to find existing user: %w", err)
412412+ }
413413+414414+ // Set the existing user's handle to NULL and set the valid_handle flag to false
415415+ if err := s.db.Model(User{}).Where("id = ?", existingUser.ID).Update("handle", nil).Update("valid_handle", false).Error; err != nil {
416416+ return nil, fmt.Errorf("failed to update outdated user's handle: %w", err)
417417+ }
418418+419419+ // Create the new user
420420+ if err := s.db.Create(&u).Error; err != nil {
421421+ return nil, fmt.Errorf("failed to create user after handle conflict: %w", err)
422422+ }
423423+424424+ s.userCache.Remove(did)
425425+ } else {
426426+ return nil, fmt.Errorf("failed to create other pds user: %w", err)
427427+ }
428428+ }
429429+430430+ successfullyCreated = true
431431+432432+ return &u, nil
433433+}
434434+435435+func (s *Archiver) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error {
436436+ ctx, span := tracer.Start(ctx, "handleFedEvent")
437437+ defer span.End()
438438+439439+ start := time.Now()
440440+ defer func() {
441441+ eventsHandleDuration.WithLabelValues(host.Host).Observe(time.Since(start).Seconds())
442442+ }()
443443+444444+ eventsReceivedCounter.WithLabelValues(host.Host).Add(1)
445445+446446+ switch {
447447+ case env.RepoCommit != nil:
448448+ repoCommitsReceivedCounter.WithLabelValues(host.Host).Add(1)
449449+ evt := env.RepoCommit
450450+ s.log.Debug("archiver got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo)
451451+452452+ st := time.Now()
453453+ u, err := s.handleUserUpdate(ctx, evt.Repo)
454454+ userLookupDuration.Observe(time.Since(st).Seconds())
455455+ if err != nil {
456456+ if !errors.Is(err, gorm.ErrRecordNotFound) {
457457+ repoCommitsResultCounter.WithLabelValues(host.Host, "nou").Inc()
458458+ return fmt.Errorf("looking up event user: %w", err)
459459+ }
460460+461461+ newUsersDiscovered.Inc()
462462+ start := time.Now()
463463+ subj, err := s.handleUserUpdate(ctx, evt.Repo)
464464+ newUserDiscoveryDuration.Observe(time.Since(start).Seconds())
465465+ if err != nil {
466466+ repoCommitsResultCounter.WithLabelValues(host.Host, "uerr").Inc()
467467+ return fmt.Errorf("fed event create external user: %w", err)
468468+ }
469469+470470+ u = subj
471471+ }
472472+473473+ ustatus := u.GetUpstreamStatus()
474474+ span.SetAttributes(attribute.String("upstream_status", ustatus))
475475+476476+ if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
477477+ span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
478478+ s.log.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
479479+ repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc()
480480+ return nil
481481+ }
482482+483483+ if ustatus == events.AccountStatusSuspended {
484484+ s.log.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
485485+ repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc()
486486+ return nil
487487+ }
488488+489489+ if ustatus == events.AccountStatusDeactivated {
490490+ s.log.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
491491+ repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc()
492492+ return nil
493493+ }
494494+495495+ if evt.Rebase {
496496+ repoCommitsResultCounter.WithLabelValues(host.Host, "rebase").Inc()
497497+ return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
498498+ }
499499+500500+ if host.ID != u.PDS && u.PDS != 0 {
501501+ s.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
502502+ // Flush any cached DID documents for this user
503503+ s.didr.FlushCacheFor(env.RepoCommit.Repo)
504504+505505+ subj, err := s.handleUserUpdate(ctx, evt.Repo)
506506+ if err != nil {
507507+ repoCommitsResultCounter.WithLabelValues(host.Host, "uerr2").Inc()
508508+ return err
509509+ }
510510+511511+ if subj.PDS != host.ID {
512512+ repoCommitsResultCounter.WithLabelValues(host.Host, "noauth").Inc()
513513+ return fmt.Errorf("event from non-authoritative pds")
514514+ }
515515+ }
516516+517517+ if u.GetTombstoned() {
518518+ span.SetAttributes(attribute.Bool("tombstoned", true))
519519+ // we've checked the authority of the users PDS, so reinstate the account
520520+ if err := s.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
521521+ repoCommitsResultCounter.WithLabelValues(host.Host, "tomb").Inc()
522522+ return fmt.Errorf("failed to un-tombstone a user: %w", err)
523523+ }
524524+ u.SetTombstoned(false)
525525+526526+ ai, err := s.Index.LookupUser(ctx, u.ID)
527527+ if err != nil {
528528+ repoCommitsResultCounter.WithLabelValues(host.Host, "nou2").Inc()
529529+ return fmt.Errorf("failed to look up user (tombstone recover): %w", err)
530530+ }
531531+532532+ // Now a simple re-crawl should suffice to bring the user back online
533533+ repoCommitsResultCounter.WithLabelValues(host.Host, "catchupt").Inc()
534534+ return s.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
535535+ }
536536+537537+ // skip the fast path for rebases or if the user is already in the slow path
538538+ if s.Index.Crawler.RepoInSlowPath(ctx, u.ID) {
539539+ ai, err := s.Index.LookupUser(ctx, u.ID)
540540+ if err != nil {
541541+ repoCommitsResultCounter.WithLabelValues(host.Host, "nou3").Inc()
542542+ return fmt.Errorf("failed to look up user (slow path): %w", err)
543543+ }
544544+545545+ // TODO: we currently do not handle events that get queued up
546546+ // behind an already 'in progress' slow path event.
547547+ // this is strictly less efficient than it could be, and while it
548548+ // does 'work' (due to falling back to resyncing the repo), its
549549+ // technically incorrect. Now that we have the parallel event
550550+ // processor coming off of the pds stream, we should investigate
551551+ // whether or not we even need this 'slow path' logic, as it makes
552552+ // accounting for which events have been processed much harder
553553+ repoCommitsResultCounter.WithLabelValues(host.Host, "catchup").Inc()
554554+ return s.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
555555+ }
556556+557557+ if err := s.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil {
558558+559559+ if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) {
560560+ ai, lerr := s.Index.LookupUser(ctx, u.ID)
561561+ if lerr != nil {
562562+ log.Warn("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
563563+ repoCommitsResultCounter.WithLabelValues(host.Host, "nou4").Inc()
564564+ return fmt.Errorf("failed to look up user %s (%d) (err case: %s): %w", u.Did, u.ID, err, lerr)
565565+ }
566566+567567+ span.SetAttributes(attribute.Bool("catchup_queue", true))
568568+569569+ log.Info("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
570570+ repoCommitsResultCounter.WithLabelValues(host.Host, "catchup2").Inc()
571571+ return s.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
572572+ }
573573+574574+ log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "commit", evt.Commit.String())
575575+ repoCommitsResultCounter.WithLabelValues(host.Host, "err").Inc()
576576+ return fmt.Errorf("handle user event failed: %w", err)
577577+ }
578578+579579+ repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc()
580580+ return nil
581581+ case env.RepoIdentity != nil:
582582+ s.log.Info("bgs got identity event", "did", env.RepoIdentity.Did)
583583+ // Flush any cached DID documents for this user
584584+ s.didr.FlushCacheFor(env.RepoIdentity.Did)
585585+586586+ // Refetch the DID doc and update our cached keys and handle etc.
587587+ _, err := s.handleUserUpdate(ctx, env.RepoIdentity.Did)
588588+ if err != nil {
589589+ return err
590590+ }
591591+592592+ return nil
593593+ case env.RepoAccount != nil:
594594+ span.SetAttributes(
595595+ attribute.String("did", env.RepoAccount.Did),
596596+ attribute.Int64("seq", env.RepoAccount.Seq),
597597+ attribute.Bool("active", env.RepoAccount.Active),
598598+ )
599599+600600+ if env.RepoAccount.Status != nil {
601601+ span.SetAttributes(attribute.String("repo_status", *env.RepoAccount.Status))
602602+ }
603603+604604+ s.log.Info("bgs got account event", "did", env.RepoAccount.Did)
605605+ // Flush any cached DID documents for this user
606606+ s.didr.FlushCacheFor(env.RepoAccount.Did)
607607+608608+ // Refetch the DID doc to make sure the PDS is still authoritative
609609+ ai, err := s.handleUserUpdate(ctx, env.RepoAccount.Did)
610610+ if err != nil {
611611+ span.RecordError(err)
612612+ return err
613613+ }
614614+615615+ // Check if the PDS is still authoritative
616616+ // if not we don't want to be propagating this account event
617617+ if ai.PDS != host.ID {
618618+ s.log.Error("account event from non-authoritative pds",
619619+ "seq", env.RepoAccount.Seq,
620620+ "did", env.RepoAccount.Did,
621621+ "event_from", host.Host,
622622+ "did_doc_declared_pds", ai.PDS,
623623+ "account_evt", env.RepoAccount,
624624+ )
625625+ return fmt.Errorf("event from non-authoritative pds")
626626+ }
627627+628628+ // Process the account status change
629629+ repoStatus := events.AccountStatusActive
630630+ if !env.RepoAccount.Active && env.RepoAccount.Status != nil {
631631+ repoStatus = *env.RepoAccount.Status
632632+ }
633633+634634+ err = s.UpdateAccountStatus(ctx, env.RepoAccount.Did, repoStatus)
635635+ if err != nil {
636636+ span.RecordError(err)
637637+ return fmt.Errorf("failed to update account status: %w", err)
638638+ }
639639+640640+ return nil
641641+ default:
642642+ return fmt.Errorf("invalid fed event")
643643+ }
644644+}
645645+646646+func (s *Archiver) UpdateAccountStatus(ctx context.Context, did string, status string) error {
647647+ ctx, span := tracer.Start(ctx, "UpdateAccountStatus")
648648+ defer span.End()
649649+650650+ span.SetAttributes(
651651+ attribute.String("did", did),
652652+ attribute.String("status", status),
653653+ )
654654+655655+ u, err := s.lookupUserByDid(ctx, did)
656656+ if err != nil {
657657+ return err
658658+ }
659659+660660+ switch status {
661661+ case events.AccountStatusActive:
662662+ // Unset the PDS-specific status flags
663663+ if err := s.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusActive).Error; err != nil {
664664+ return fmt.Errorf("failed to set user active status: %w", err)
665665+ }
666666+ u.SetUpstreamStatus(events.AccountStatusActive)
667667+ case events.AccountStatusDeactivated:
668668+ if err := s.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusDeactivated).Error; err != nil {
669669+ return fmt.Errorf("failed to set user deactivation status: %w", err)
670670+ }
671671+ u.SetUpstreamStatus(events.AccountStatusDeactivated)
672672+ case events.AccountStatusSuspended:
673673+ if err := s.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusSuspended).Error; err != nil {
674674+ return fmt.Errorf("failed to set user suspension status: %w", err)
675675+ }
676676+ u.SetUpstreamStatus(events.AccountStatusSuspended)
677677+ case events.AccountStatusTakendown:
678678+ if err := s.db.Model(User{}).Where("id = ?", u.ID).Update("upstream_status", events.AccountStatusTakendown).Error; err != nil {
679679+ return fmt.Errorf("failed to set user taken down status: %w", err)
680680+ }
681681+ u.SetUpstreamStatus(events.AccountStatusTakendown)
682682+683683+ if err := s.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
684684+ "handle": nil,
685685+ }).Error; err != nil {
686686+ return err
687687+ }
688688+ case events.AccountStatusDeleted:
689689+ if err := s.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{
690690+ "tombstoned": true,
691691+ "handle": nil,
692692+ "upstream_status": events.AccountStatusDeleted,
693693+ }).Error; err != nil {
694694+ return err
695695+ }
696696+ u.SetUpstreamStatus(events.AccountStatusDeleted)
697697+698698+ if err := s.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
699699+ "handle": nil,
700700+ }).Error; err != nil {
701701+ return err
702702+ }
703703+704704+ // delete data from carstore
705705+ if err := s.repoman.TakeDownRepo(ctx, u.ID); err != nil {
706706+ // don't let a failure here prevent us from propagating this event
707707+ s.log.Error("failed to delete user data from carstore", "err", err)
708708+ }
709709+ }
710710+711711+ return nil
712712+}
713713+714714+func (s *Archiver) lookupUserByDid(ctx context.Context, did string) (*User, error) {
715715+ ctx, span := tracer.Start(ctx, "lookupUserByDid")
716716+ defer span.End()
717717+718718+ cu, ok := s.userCache.Get(did)
719719+ if ok {
720720+ return cu, nil
721721+ }
722722+723723+ var u User
724724+ if err := s.db.Find(&u, "did = ?", did).Error; err != nil {
725725+ return nil, err
726726+ }
727727+728728+ if u.ID == 0 {
729729+ return nil, gorm.ErrRecordNotFound
730730+ }
731731+732732+ s.userCache.Add(did, &u)
733733+734734+ return &u, nil
735735+}
736736+737737+func (s *Archiver) lookupUserByUID(ctx context.Context, uid models.Uid) (*User, error) {
738738+ ctx, span := tracer.Start(ctx, "lookupUserByUID")
739739+ defer span.End()
740740+741741+ var u User
742742+ if err := s.db.Find(&u, "id = ?", uid).Error; err != nil {
743743+ return nil, err
744744+ }
745745+746746+ if u.ID == 0 {
747747+ return nil, gorm.ErrRecordNotFound
748748+ }
749749+750750+ return &u, nil
751751+}
752752+753753+type AuthToken struct {
754754+ gorm.Model
755755+ Token string `gorm:"index"`
756756+}
757757+758758+func (s *Archiver) lookupAdminToken(tok string) (bool, error) {
759759+ var at AuthToken
760760+ if err := s.db.Find(&at, "token = ?", tok).Error; err != nil {
761761+ return false, err
762762+ }
763763+764764+ if at.ID == 0 {
765765+ return false, nil
766766+ }
767767+768768+ return true, nil
769769+}
770770+771771+func (s *Archiver) CreateAdminToken(tok string) error {
772772+ exists, err := s.lookupAdminToken(tok)
773773+ if err != nil {
774774+ return err
775775+ }
776776+777777+ if exists {
778778+ return nil
779779+ }
780780+781781+ return s.db.Create(&AuthToken{
782782+ Token: tok,
783783+ }).Error
784784+}
785785+786786+func (s *Archiver) StartMetrics(listen string) error {
787787+ http.Handle("/metrics", promhttp.Handler())
788788+ return http.ListenAndServe(listen, nil)
789789+}
790790+791791+const serverListenerBootTimeout = 5 * time.Second
792792+793793+func (s *Archiver) Start(addr string) error {
794794+ var lc net.ListenConfig
795795+ ctx, cancel := context.WithTimeout(context.Background(), serverListenerBootTimeout)
796796+ defer cancel()
797797+798798+ li, err := lc.Listen(ctx, "tcp", addr)
799799+ if err != nil {
800800+ return err
801801+ }
802802+ return s.StartWithListener(li)
803803+}
804804+805805+func (s *Archiver) StartWithListener(listen net.Listener) error {
806806+ e := echo.New()
807807+ e.HideBanner = true
808808+809809+ e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
810810+ AllowOrigins: []string{"*"},
811811+ AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization},
812812+ }))
813813+814814+ if !s.ssl {
815815+ e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{
816816+ Format: "method=${method}, uri=${uri}, status=${status} latency=${latency_human}\n",
817817+ }))
818818+ } else {
819819+ e.Use(middleware.LoggerWithConfig(middleware.DefaultLoggerConfig))
820820+ }
821821+822822+ // React uses a virtual router, so we need to serve the index.html for all
823823+ // routes that aren't otherwise handled or in the /assets directory.
824824+ e.File("/dash", "public/index.html")
825825+ e.File("/dash/*", "public/index.html")
826826+ e.Static("/assets", "public/assets")
827827+828828+ e.Use(svcutil.MetricsMiddleware)
829829+830830+ e.HTTPErrorHandler = func(err error, ctx echo.Context) {
831831+ switch err := err.(type) {
832832+ case *echo.HTTPError:
833833+ if err2 := ctx.JSON(err.Code, map[string]any{
834834+ "error": err.Message,
835835+ }); err2 != nil {
836836+ s.log.Error("Failed to write http error", "err", err2)
837837+ }
838838+ default:
839839+ sendHeader := true
840840+ if ctx.Path() == "/xrpc/com.atproto.sync.subscribeRepos" {
841841+ sendHeader = false
842842+ }
843843+844844+ s.log.Warn("HANDLER ERROR: (%s) %s", ctx.Path(), err)
845845+846846+ if strings.HasPrefix(ctx.Path(), "/admin/") {
847847+ ctx.JSON(500, map[string]any{
848848+ "error": err.Error(),
849849+ })
850850+ return
851851+ }
852852+853853+ if sendHeader {
854854+ ctx.Response().WriteHeader(500)
855855+ }
856856+ }
857857+ }
858858+859859+ // TODO: this API is temporary until we formalize what we want here
860860+861861+ e.GET("/xrpc/com.atproto.sync.getRepo", s.HandleComAtprotoSyncGetRepo)
862862+ //e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos)
863863+ //e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit)
864864+ e.GET("/xrpc/_health", s.HandleHealthCheck)
865865+ e.GET("/_health", s.HandleHealthCheck)
866866+ e.GET("/", s.HandleHomeMessage)
867867+868868+ /*
869869+ admin := e.Group("/admin", s.checkAdminAuth)
870870+871871+ // Slurper-related Admin API
872872+ admin.GET("/subs/getUpstreamConns", s.handleAdminGetUpstreamConns)
873873+ admin.GET("/subs/getEnabled", s.handleAdminGetSubsEnabled)
874874+ admin.GET("/subs/perDayLimit", s.handleAdminGetNewPDSPerDayRateLimit)
875875+ admin.POST("/subs/setEnabled", s.handleAdminSetSubsEnabled)
876876+ admin.POST("/subs/killUpstream", s.handleAdminKillUpstreamConn)
877877+ admin.POST("/subs/setPerDayLimit", s.handleAdminSetNewPDSPerDayRateLimit)
878878+879879+ // Domain-related Admin API
880880+ admin.GET("/subs/listDomainBans", s.handleAdminListDomainBans)
881881+ admin.POST("/subs/banDomain", s.handleAdminBanDomain)
882882+ admin.POST("/subs/unbanDomain", s.handleAdminUnbanDomain)
883883+884884+ // Repo-related Admin API
885885+ admin.POST("/repo/takeDown", s.handleAdminTakeDownRepo)
886886+ admin.POST("/repo/reverseTakedown", s.handleAdminReverseTakedown)
887887+ admin.GET("/repo/takedowns", s.handleAdminListRepoTakeDowns)
888888+ admin.POST("/repo/compact", s.handleAdminCompactRepo)
889889+ admin.POST("/repo/compactAll", s.handleAdminCompactAllRepos)
890890+ admin.POST("/repo/reset", s.handleAdminResetRepo)
891891+ admin.POST("/repo/verify", s.handleAdminVerifyRepo)
892892+893893+ // PDS-related Admin API
894894+ admin.POST("/pds/requestCrawl", s.handleAdminRequestCrawl)
895895+ admin.GET("/pds/list", s.handleListPDSs)
896896+ admin.POST("/pds/resync", s.handleAdminPostResyncPDS)
897897+ admin.GET("/pds/resync", s.handleAdminGetResyncPDS)
898898+ admin.POST("/pds/changeLimits", s.handleAdminChangePDSRateLimits)
899899+ admin.POST("/pds/block", s.handleBlockPDS)
900900+ admin.POST("/pds/unblock", s.handleUnblockPDS)
901901+ admin.POST("/pds/addTrustedDomain", s.handleAdminAddTrustedDomain)
902902+903903+ // Consumer-related Admin API
904904+ admin.GET("/consumers/list", s.handleAdminListConsumers)
905905+ */
906906+907907+ // In order to support booting on random ports in tests, we need to tell the
908908+ // Echo instance it's already got a port, and then use its StartServer
909909+ // method to re-use that listener.
910910+ e.Listener = listen
911911+ srv := &http.Server{}
912912+ return e.StartServer(srv)
913913+}
914914+915915+func (s *Archiver) Shutdown() []error {
916916+ errs := s.slurper.Shutdown()
917917+918918+ s.compactor.Shutdown()
919919+920920+ return errs
921921+}
+314
archiver/crawler.go
···11+package archiver
22+33+import (
44+ "context"
55+ "fmt"
66+ "log/slog"
77+ "sync"
88+ "time"
99+1010+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1111+ "github.com/bluesky-social/indigo/models"
1212+1313+ "go.opentelemetry.io/otel"
1414+)
1515+1616+type CrawlDispatcher struct {
1717+ // from Crawl()
1818+ ingest chan *User
1919+2020+ // from AddToCatchupQueue()
2121+ catchup chan *crawlWork
2222+2323+ // from main loop to fetchWorker()
2424+ repoSync chan *crawlWork
2525+2626+ complete chan models.Uid
2727+2828+ maplk sync.Mutex
2929+ todo map[models.Uid]*crawlWork
3030+ inProgress map[models.Uid]*crawlWork
3131+3232+ repoFetcher CrawlRepoFetcher
3333+3434+ concurrency int
3535+3636+ log *slog.Logger
3737+3838+ done chan struct{}
3939+}
4040+4141+// this is what we need of RepoFetcher
4242+type CrawlRepoFetcher interface {
4343+ FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
4444+}
4545+4646+func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
4747+ if concurrency < 1 {
4848+ return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
4949+ }
5050+5151+ out := &CrawlDispatcher{
5252+ ingest: make(chan *User),
5353+ repoSync: make(chan *crawlWork),
5454+ complete: make(chan models.Uid),
5555+ catchup: make(chan *crawlWork),
5656+ repoFetcher: repoFetcher,
5757+ concurrency: concurrency,
5858+ todo: make(map[models.Uid]*crawlWork),
5959+ inProgress: make(map[models.Uid]*crawlWork),
6060+ log: log,
6161+ done: make(chan struct{}),
6262+ }
6363+ go out.CatchupRepoGaugePoller()
6464+6565+ return out, nil
6666+}
6767+6868+func (c *CrawlDispatcher) Run() {
6969+ go c.mainLoop()
7070+7171+ for i := 0; i < c.concurrency; i++ {
7272+ go c.fetchWorker()
7373+ }
7474+}
7575+7676+func (c *CrawlDispatcher) Shutdown() {
7777+ close(c.done)
7878+}
7979+8080+type catchupJob struct {
8181+ evt *comatproto.SyncSubscribeRepos_Commit
8282+ host *models.PDS
8383+ user *User
8484+}
8585+8686+type crawlWork struct {
8787+ act *User
8888+ initScrape bool
8989+9090+ // for events that come in while this actor's crawl is enqueued
9191+ // catchup items are processed during the crawl
9292+ catchup []*catchupJob
9393+9494+ // for events that come in while this actor is being processed
9595+ // next items are processed after the crawl
9696+ next []*catchupJob
9797+}
9898+9999+func (c *CrawlDispatcher) mainLoop() {
100100+ var nextDispatchedJob *crawlWork
101101+ var jobsAwaitingDispatch []*crawlWork
102102+103103+ // dispatchQueue represents the repoSync worker channel to which we dispatch crawl work
104104+ var dispatchQueue chan *crawlWork
105105+106106+ for {
107107+ select {
108108+ case actorToCrawl := <-c.ingest:
109109+ // TODO: max buffer size
110110+ crawlJob := c.enqueueJobForActor(actorToCrawl)
111111+ if crawlJob == nil {
112112+ break
113113+ }
114114+115115+ if nextDispatchedJob == nil {
116116+ nextDispatchedJob = crawlJob
117117+ dispatchQueue = c.repoSync
118118+ } else {
119119+ jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob)
120120+ }
121121+ case dispatchQueue <- nextDispatchedJob:
122122+ c.dequeueJob(nextDispatchedJob)
123123+124124+ if len(jobsAwaitingDispatch) > 0 {
125125+ nextDispatchedJob = jobsAwaitingDispatch[0]
126126+ jobsAwaitingDispatch = jobsAwaitingDispatch[1:]
127127+ } else {
128128+ nextDispatchedJob = nil
129129+ dispatchQueue = nil
130130+ }
131131+ case catchupJob := <-c.catchup:
132132+ // CatchupJobs are for processing events that come in while a crawl is in progress
133133+ // They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress
134134+ if nextDispatchedJob == nil {
135135+ nextDispatchedJob = catchupJob
136136+ dispatchQueue = c.repoSync
137137+ } else {
138138+ jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob)
139139+ }
140140+ case uid := <-c.complete:
141141+ c.maplk.Lock()
142142+143143+ job, ok := c.inProgress[uid]
144144+ if !ok {
145145+ panic("should not be possible to not have a job in progress we receive a completion signal for")
146146+ }
147147+ delete(c.inProgress, uid)
148148+149149+ // If there are any subsequent jobs for this UID, add it back to the todo list or buffer.
150150+ // We're basically pumping the `next` queue into the `catchup` queue and will do this over and over until the `next` queue is empty.
151151+ if len(job.next) > 0 {
152152+ c.todo[uid] = job
153153+ job.initScrape = false
154154+ job.catchup = job.next
155155+ job.next = nil
156156+ if nextDispatchedJob == nil {
157157+ nextDispatchedJob = job
158158+ dispatchQueue = c.repoSync
159159+ } else {
160160+ jobsAwaitingDispatch = append(jobsAwaitingDispatch, job)
161161+ }
162162+ }
163163+ c.maplk.Unlock()
164164+ }
165165+ }
166166+}
167167+168168+// enqueueJobForActor adds a new crawl job to the todo list if there isn't already a job in progress for this actor
169169+func (c *CrawlDispatcher) enqueueJobForActor(ai *User) *crawlWork {
170170+ c.maplk.Lock()
171171+ defer c.maplk.Unlock()
172172+ _, ok := c.inProgress[ai.ID]
173173+ if ok {
174174+ return nil
175175+ }
176176+177177+ _, has := c.todo[ai.ID]
178178+ if has {
179179+ return nil
180180+ }
181181+182182+ crawlJob := &crawlWork{
183183+ act: ai,
184184+ initScrape: true,
185185+ }
186186+ c.todo[ai.ID] = crawlJob
187187+ return crawlJob
188188+}
189189+190190+// dequeueJob removes a job from the todo list and adds it to the inProgress list
191191+func (c *CrawlDispatcher) dequeueJob(job *crawlWork) {
192192+ c.maplk.Lock()
193193+ defer c.maplk.Unlock()
194194+ delete(c.todo, job.act.ID)
195195+ c.inProgress[job.act.ID] = job
196196+}
197197+198198+func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
199199+ c.maplk.Lock()
200200+ defer c.maplk.Unlock()
201201+202202+ // If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl
203203+ job, ok := c.todo[catchup.user.ID]
204204+ if ok {
205205+ catchupEventsEnqueued.WithLabelValues("todo").Inc()
206206+ job.catchup = append(job.catchup, catchup)
207207+ return nil
208208+ }
209209+210210+ // If the actor crawl is in progress, we can append to the nextr queue which gets emptied after the crawl
211211+ job, ok = c.inProgress[catchup.user.ID]
212212+ if ok {
213213+ catchupEventsEnqueued.WithLabelValues("prog").Inc()
214214+ job.next = append(job.next, catchup)
215215+ return nil
216216+ }
217217+218218+ catchupEventsEnqueued.WithLabelValues("new").Inc()
219219+ // Otherwise, we need to create a new crawl job for this actor and enqueue it
220220+ cw := &crawlWork{
221221+ act: catchup.user,
222222+ catchup: []*catchupJob{catchup},
223223+ }
224224+ c.todo[catchup.user.ID] = cw
225225+ return cw
226226+}
227227+228228+func (c *CrawlDispatcher) fetchWorker() {
229229+ for {
230230+ select {
231231+ case job := <-c.repoSync:
232232+ if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
233233+ c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
234234+ }
235235+236236+ // TODO: do we still just do this if it errors?
237237+ c.complete <- job.act.ID
238238+ }
239239+ }
240240+}
241241+242242+func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *User) error {
243243+ if ai.PDS == 0 {
244244+ panic("must have pds for user in queue")
245245+ }
246246+247247+ userCrawlsEnqueued.Inc()
248248+249249+ ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler")
250250+ defer span.End()
251251+252252+ select {
253253+ case c.ingest <- ai:
254254+ return nil
255255+ case <-ctx.Done():
256256+ return ctx.Err()
257257+ }
258258+}
259259+260260+func (c *CrawlDispatcher) AddToCatchupQueue(ctx context.Context, host *models.PDS, u *User, evt *comatproto.SyncSubscribeRepos_Commit) error {
261261+ if u.PDS == 0 {
262262+ panic("must have pds for user in queue")
263263+ }
264264+265265+ catchup := &catchupJob{
266266+ evt: evt,
267267+ host: host,
268268+ user: u,
269269+ }
270270+271271+ cw := c.addToCatchupQueue(catchup)
272272+ if cw == nil {
273273+ return nil
274274+ }
275275+276276+ select {
277277+ case c.catchup <- cw:
278278+ return nil
279279+ case <-ctx.Done():
280280+ return ctx.Err()
281281+ }
282282+}
283283+284284+func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bool {
285285+ c.maplk.Lock()
286286+ defer c.maplk.Unlock()
287287+ if _, ok := c.todo[uid]; ok {
288288+ return true
289289+ }
290290+291291+ if _, ok := c.inProgress[uid]; ok {
292292+ return true
293293+ }
294294+295295+ return false
296296+}
297297+298298+func (c *CrawlDispatcher) countReposInSlowPath() int {
299299+ c.maplk.Lock()
300300+ defer c.maplk.Unlock()
301301+ return len(c.inProgress) + len(c.todo)
302302+}
303303+304304+func (c *CrawlDispatcher) CatchupRepoGaugePoller() {
305305+ ticker := time.NewTicker(30 * time.Second)
306306+ defer ticker.Stop()
307307+ for {
308308+ select {
309309+ case <-c.done:
310310+ case <-ticker.C:
311311+ catchupReposGauge.Set(float64(c.countReposInSlowPath()))
312312+ }
313313+ }
314314+}
+88
archiver/handlers.go
···11+package archiver
22+33+import (
44+ "errors"
55+ "fmt"
66+ "net/http"
77+88+ "github.com/bluesky-social/indigo/atproto/syntax"
99+ "github.com/bluesky-social/indigo/bgs"
1010+ "github.com/bluesky-social/indigo/events"
1111+ "github.com/labstack/echo/v4"
1212+ "github.com/labstack/gommon/log"
1313+ "go.opentelemetry.io/otel"
1414+ "gorm.io/gorm"
1515+)
1616+1717+func (s *Archiver) HandleComAtprotoSyncGetRepo(c echo.Context) error {
1818+ ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRepo")
1919+ defer span.End()
2020+ did := c.QueryParam("did")
2121+ since := c.QueryParam("since")
2222+2323+ _, err := syntax.ParseDID(did)
2424+ if err != nil {
2525+ return c.JSON(http.StatusBadRequest, bgs.XRPCError{Message: fmt.Sprintf("invalid did: %s", did)})
2626+ }
2727+2828+ c.Response().Header().Set(echo.HeaderContentType, "application/vnd.ipld.car")
2929+3030+ u, err := s.lookupUserByDid(ctx, did)
3131+ if err != nil {
3232+ if errors.Is(err, gorm.ErrRecordNotFound) {
3333+ return echo.NewHTTPError(http.StatusNotFound, "user not found")
3434+ }
3535+ log.Error("failed to lookup user", "err", err, "did", did)
3636+ return echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
3737+ }
3838+3939+ if u.GetTombstoned() {
4040+ return fmt.Errorf("account was deleted")
4141+ }
4242+4343+ if u.GetTakenDown() {
4444+ return fmt.Errorf("account was taken down by the Relay")
4545+ }
4646+4747+ ustatus := u.GetUpstreamStatus()
4848+ if ustatus == events.AccountStatusTakendown {
4949+ return fmt.Errorf("account was taken down by its PDS")
5050+ }
5151+5252+ if ustatus == events.AccountStatusDeactivated {
5353+ return fmt.Errorf("account is temporarily deactivated")
5454+ }
5555+5656+ if ustatus == events.AccountStatusSuspended {
5757+ return fmt.Errorf("account is suspended by its PDS")
5858+ }
5959+6060+ if err := s.repoman.ReadRepo(ctx, u.ID, since, c.Response()); err != nil {
6161+ log.Error("failed to stream repo", "err", err, "did", did)
6262+ return echo.NewHTTPError(http.StatusInternalServerError, "failed to stream repo")
6363+ }
6464+6565+ return nil
6666+}
6767+6868+type HealthStatus struct {
6969+ Status string `json:"status"`
7070+ Message string `json:"msg,omitempty"`
7171+}
7272+7373+func (s *Archiver) HandleHealthCheck(c echo.Context) error {
7474+ if err := s.db.Exec("SELECT 1").Error; err != nil {
7575+ s.log.Error("healthcheck can't connect to database", "err", err)
7676+ return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"})
7777+ } else {
7878+ return c.JSON(200, HealthStatus{Status: "ok"})
7979+ }
8080+}
8181+8282+var homeMessage string = `
8383+[ insert fancy archiver art here ]
8484+`
8585+8686+func (s *Archiver) HandleHomeMessage(c echo.Context) error {
8787+ return c.String(http.StatusOK, homeMessage)
8888+}
···11+#!/usr/bin/env python3
22+#
33+# pip install requests
44+#
55+# python3 resync_pdses.py --admin-key hunter2 --url http://myrelay:2470 host_per_line.txt
66+77+import json
88+import sys
99+import urllib.parse
1010+1111+import requests
1212+1313+1414+# pds limits for POST /admin/pds/changeLimits
1515+# {"host":"", "per_second": int, "per_hour": int, "per_day": int, "crawl_rate": int, "repo_limit": int}
1616+1717+limitsKeys = ('per_second', 'per_hour', 'per_day', 'crawl_rate', 'repo_limit')
1818+1919+def checkLimits(limits):
2020+ for k in limits.keys():
2121+ if k not in limitsKeys:
2222+ raise Exception(f"unknown pds rate limits key {k!r}")
2323+ return True
2424+2525+class relay:
2626+ def __init__(self, rooturl, headers=None, session=None):
2727+ "rooturl string, headers dict or None, session requests.Session() or None"
2828+ self.rooturl = rooturl
2929+ self.headers = headers or dict()
3030+ self.session = session or requests.Session()
3131+3232+ def resync(self, host):
3333+ "host string"
3434+ url = urllib.parse.urljoin(self.rooturl, '/admin/pds/resync')
3535+ response = self.session.post(url, params={"host": host}, headers=self.headers, data='')
3636+ if response.status_code != 200:
3737+ sys.stderr.write(f"{url}?host={host} : ({response.status_code}) ({response.text!r})\n")
3838+ else:
3939+ sys.stderr.write(f"{url}?host={host} : OK\n")
4040+4141+ def crawlAndSetLimits(self, host, limits):
4242+ "host string, limits dict"
4343+ pheaders = dict(self.headers)
4444+ pheaders['Content-Type'] = 'application/json'
4545+ url = urllib.parse.urljoin(self.rooturl, '/admin/pds/requestCrawl')
4646+ response = self.session.post(url, headers=pheaders, data=json.dumps({"hostname": host}))
4747+ if response.status_code != 200:
4848+ sys.stderr.write(f"{url} {host} : {response.status_code} {response.text!r}\n")
4949+ return
5050+ if limits is None:
5151+ sys.stderr.write(f"requestCrawl {host} OK\n")
5252+ url = urllib.parse.urljoin(self.rooturl, '/admin/pds/changeLimits')
5353+ plimits = dict(limits)
5454+ plimits["host"] = host
5555+ response = self.session.post(url, headers=pheaders, data=json.dumps(plimits))
5656+ if response.status_code != 200:
5757+ sys.stderr.write(f"{url} {host} : {response.status_code} {response.text!r}\n")
5858+ return
5959+ sys.stderr.write(f"requestCrawl + changeLimits {host} OK\n")
6060+6161+def main():
6262+ import argparse
6363+ ap = argparse.ArgumentParser()
6464+ ap.add_argument('input', default='-', help='host per line text file to read, - for stdin')
6565+ ap.add_argument('--admin-key', default=None, help='relay auth bearer token', required=True)
6666+ ap.add_argument('--url', default=None, help='base url to POST /admin/pds/resync', required=True)
6767+ ap.add_argument('--resync', default=False, action='store_true', help='resync selected PDSes')
6868+ ap.add_argument('--limits', default=None, help='json pds rate limits')
6969+ ap.add_argument('--crawl', default=False, action='store_true', help='crawl & set limits')
7070+ args = ap.parse_args()
7171+7272+ headers = {'Authorization': 'Bearer ' + args.admin_key}
7373+7474+ relaySession = relay(args.url, headers)
7575+7676+ #url = urllib.parse.urljoin(args.url, '/admin/pds/resync')
7777+7878+ #sess = requests.Session()
7979+ if args.crawl and args.resync:
8080+ sys.stderr.write("should only specify one of --resync --crawl")
8181+ sys.exit(1)
8282+ if (not args.crawl) and (not args.resync):
8383+ sys.stderr.write("should specify one of --resync --crawl")
8484+ sys.exit(1)
8585+8686+ limits = None
8787+ if args.limits:
8888+ limits = json.loads(args.limits)
8989+ checkLimits(limits)
9090+9191+ if args.input == '-':
9292+ fin = sys.stdin
9393+ else:
9494+ fin = open(args.input, 'rt')
9595+ for line in fin:
9696+ if not line:
9797+ continue
9898+ line = line.strip()
9999+ if not line:
100100+ continue
101101+ if line[0] == '#':
102102+ continue
103103+ host = line
104104+ if args.crawl:
105105+ relaySession.crawlAndSetLimits(host, limits)
106106+ elif args.resync:
107107+ relaySession.resync(host)
108108+ # response = sess.post(url, params={"host": line}, headers=headers)
109109+ # if response.status_code != 200:
110110+ # sys.stderr.write(f"{url}?host={line} : ({response.status_code}) ({response.text!r})\n")
111111+ # else:
112112+ # sys.stderr.write(f"{url}?host={line} : OK\n")
113113+114114+if __name__ == '__main__':
115115+ main()
+24
cmd/archit/sync_pds.sh
···11+#!/usr/bin/env bash
22+33+set -e # fail on error
44+set -u # fail if variable not set in substitution
55+set -o pipefail # fail if part of a '|' command fails
66+77+if test -z "${RELAY_ADMIN_KEY}"; then
88+ echo "RELAY_ADMIN_KEY secret is not defined"
99+ exit -1
1010+fi
1111+1212+if test -z "${RELAY_HOST}"; then
1313+ echo "RELAY_HOST config not defined"
1414+ exit -1
1515+fi
1616+1717+if test -z "$1"; then
1818+ echo "expected PDS hostname as an argument"
1919+ exit -1
2020+fi
2121+2222+echo "POST resync $1"
2323+http --ignore-stdin post https://${RELAY_HOST}/admin/pds/resync Authorization:"Bearer ${RELAY_ADMIN_KEY}" \
2424+ host==$1
+24
cmd/archit/sync_status_pds.sh
···11+#!/usr/bin/env bash
22+33+set -e # fail on error
44+set -u # fail if variable not set in substitution
55+set -o pipefail # fail if part of a '|' command fails
66+77+if test -z "${RELAY_ADMIN_KEY}"; then
88+ echo "RELAY_ADMIN_KEY secret is not defined"
99+ exit -1
1010+fi
1111+1212+if test -z "${RELAY_HOST}"; then
1313+ echo "RELAY_HOST config not defined"
1414+ exit -1
1515+fi
1616+1717+if test -z "$1"; then
1818+ echo "expected PDS hostname as an argument"
1919+ exit -1
2020+fi
2121+2222+echo "GET resync $1"
2323+http --ignore-stdin --pretty all get https://${RELAY_HOST}/admin/pds/resync Authorization:"Bearer ${RELAY_ADMIN_KEY}" \
2424+ host==$1
+41
repostore/README.md
···11+# Carstore
22+33+Store a zillion users of PDS-like repo, with more limited operations (mainly: firehose in, firehose out).
44+55+## [ScyllaStore](scylla.go)
66+77+Blocks stored in ScyllaDB.
88+User and PDS metadata stored in gorm (PostgreSQL or sqlite3).
99+1010+## [FileCarStore](bs.go)
1111+1212+Store 'car slices' from PDS source subscribeRepo firehose streams to filesystem.
1313+Store metadata to gorm postgresql (or sqlite3).
1414+Periodic compaction of car slices into fewer larger car slices.
1515+User and PDS metadata stored in gorm (PostgreSQL or sqlite3).
1616+FileCarStore was the first production carstore and used through at least 2024-11.
1717+1818+## [SQLiteStore](sqlite_store.go)
1919+2020+Experimental/demo.
2121+Blocks stored in trivial local sqlite3 schema.
2222+Minimal reference implementation from which fancy scalable/performant implementations may be derived.
2323+2424+```sql
2525+CREATE TABLE IF NOT EXISTS blocks (uid int, cid blob, rev varchar, root blob, block blob, PRIMARY KEY(uid,cid))
2626+CREATE INDEX IF NOT EXISTS blocx_by_rev ON blocks (uid, rev DESC)
2727+2828+INSERT INTO blocks (uid, cid, rev, root, block) VALUES (?, ?, ?, ?, ?) ON CONFLICT (uid,cid) DO UPDATE SET rev=excluded.rev, root=excluded.root, block=excluded.block
2929+3030+SELECT rev, root FROM blocks WHERE uid = ? ORDER BY rev DESC LIMIT 1
3131+3232+SELECT cid,rev,root,block FROM blocks WHERE uid = ? AND rev > ? ORDER BY rev DESC
3333+3434+DELETE FROM blocks WHERE uid = ?
3535+3636+SELECT rev, root FROM blocks WHERE uid = ? AND cid = ? LIMIT 1
3737+3838+SELECT block FROM blocks WHERE uid = ? AND cid = ? LIMIT 1
3939+4040+SELECT length(block) FROM blocks WHERE uid = ? AND cid = ? LIMIT 1
4141+```
+1155
repostore/bs.go
···11+package repostore
22+33+import (
44+ "bufio"
55+ "bytes"
66+ "context"
77+ "fmt"
88+ "io"
99+ "log/slog"
1010+ "os"
1111+ "path/filepath"
1212+ "sort"
1313+ "time"
1414+1515+ carstore "github.com/bluesky-social/indigo/carstore"
1616+ carstore1 "github.com/bluesky-social/indigo/carstore"
1717+ "github.com/bluesky-social/indigo/models"
1818+ "github.com/prometheus/client_golang/prometheus"
1919+ "github.com/prometheus/client_golang/prometheus/promauto"
2020+2121+ blockformat "github.com/ipfs/go-block-format"
2222+ "github.com/ipfs/go-cid"
2323+ blockstore "github.com/ipfs/go-ipfs-blockstore"
2424+ cbor "github.com/ipfs/go-ipld-cbor"
2525+ ipld "github.com/ipfs/go-ipld-format"
2626+ "github.com/ipfs/go-libipfs/blocks"
2727+ car "github.com/ipld/go-car"
2828+ carutil "github.com/ipld/go-car/util"
2929+ cbg "github.com/whyrusleeping/cbor-gen"
3030+ "go.opentelemetry.io/otel"
3131+ "go.opentelemetry.io/otel/attribute"
3232+ "gorm.io/gorm"
3333+)
3434+3535+var blockGetTotalCounter = promauto.NewCounterVec(prometheus.CounterOpts{
3636+ Name: "carstore2_block_get_total",
3737+ Help: "carstore get queries",
3838+}, []string{"usrskip", "cache"})
3939+4040+const MaxSliceLength = 2 << 20
4141+4242+const BigShardThreshold = 2 << 20
4343+4444+type CarStore interface {
4545+ // TODO: not really part of general interface
4646+ CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*carstore1.CompactionStats, error)
4747+ // TODO: not really part of general interface
4848+ GetCompactionTargets(ctx context.Context, shardCount int) ([]carstore1.CompactionTarget, error)
4949+5050+ GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error)
5151+ GetUserRepoRev(ctx context.Context, user models.Uid) (string, error)
5252+ ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, carstore1.BlockStorage, error)
5353+ NewDeltaSession(ctx context.Context, user models.Uid, since *string) (carstore1.BlockStorage, error)
5454+ ReadOnlySession(user models.Uid) (carstore1.BlockStorage, error)
5555+ ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error
5656+ Stat(ctx context.Context, usr models.Uid) ([]carstore1.UserStat, error)
5757+ WipeUserData(ctx context.Context, user models.Uid) error
5858+}
5959+6060+type FileCarStore struct {
6161+ meta *CarStoreGormMeta
6262+ rootDirs []string
6363+6464+ lastShardCache lastShardCache
6565+6666+ log *slog.Logger
6767+}
6868+6969+func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) {
7070+ for _, root := range roots {
7171+ if _, err := os.Stat(root); err != nil {
7272+ if !os.IsNotExist(err) {
7373+ return nil, err
7474+ }
7575+7676+ if err := os.Mkdir(root, 0775); err != nil {
7777+ return nil, err
7878+ }
7979+ }
8080+ }
8181+ if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil {
8282+ return nil, err
8383+ }
8484+ if err := meta.AutoMigrate(&staleRef{}); err != nil {
8585+ return nil, err
8686+ }
8787+8888+ gormMeta := &CarStoreGormMeta{meta: meta}
8989+ out := &FileCarStore{
9090+ meta: gormMeta,
9191+ rootDirs: roots,
9292+ lastShardCache: lastShardCache{
9393+ source: gormMeta,
9494+ },
9595+ log: slog.Default().With("system", "carstore"),
9696+ }
9797+ out.lastShardCache.Init()
9898+ return out, nil
9999+}
100100+101101+// wrapper into a block store that keeps track of which user we are working on behalf of
102102+type userView struct {
103103+ user models.Uid
104104+105105+ cache map[cid.Cid]blockformat.Block
106106+ prefetch bool
107107+}
108108+109109+var _ blockstore.Blockstore = (*userView)(nil)
110110+111111+func (uv *userView) HashOnRead(hor bool) {
112112+ //noop
113113+}
114114+115115+func (uv *userView) Has(ctx context.Context, k cid.Cid) (bool, error) {
116116+ _, have := uv.cache[k]
117117+ if have {
118118+ return have, nil
119119+ }
120120+ return false, nil
121121+}
122122+123123+func (uv *userView) Get(ctx context.Context, k cid.Cid) (blockformat.Block, error) {
124124+ if !k.Defined() {
125125+ return nil, fmt.Errorf("attempted to 'get' undefined cid")
126126+ }
127127+ if uv.cache != nil {
128128+ blk, ok := uv.cache[k]
129129+ if ok {
130130+ return blk, nil
131131+ }
132132+ }
133133+134134+ return nil, fmt.Errorf("cant do arbitrary reads from this")
135135+}
136136+137137+const prefetchThreshold = 512 << 10
138138+139139+func (uv *userView) prefetchRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) {
140140+ ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard")
141141+ defer span.End()
142142+143143+ fi, err := os.Open(path)
144144+ if err != nil {
145145+ return nil, err
146146+ }
147147+ defer fi.Close()
148148+149149+ st, err := fi.Stat()
150150+ if err != nil {
151151+ return nil, fmt.Errorf("stat file for prefetch: %w", err)
152152+ }
153153+154154+ span.SetAttributes(attribute.Int64("shard_size", st.Size()))
155155+156156+ if st.Size() > prefetchThreshold {
157157+ span.SetAttributes(attribute.Bool("no_prefetch", true))
158158+ return doBlockRead(fi, k, offset)
159159+ }
160160+161161+ cr, err := car.NewCarReader(fi)
162162+ if err != nil {
163163+ return nil, err
164164+ }
165165+166166+ for {
167167+ blk, err := cr.Next()
168168+ if err != nil {
169169+ if err == io.EOF {
170170+ break
171171+ }
172172+ return nil, err
173173+ }
174174+175175+ uv.cache[blk.Cid()] = blk
176176+ }
177177+178178+ outblk, ok := uv.cache[k]
179179+ if !ok {
180180+ return nil, fmt.Errorf("requested block was not found in car slice")
181181+ }
182182+183183+ return outblk, nil
184184+}
185185+186186+func (uv *userView) preloadBlocksFromFile(ctx context.Context, path string) error {
187187+ fi, err := os.Open(path)
188188+ if err != nil {
189189+ return err
190190+ }
191191+ defer fi.Close()
192192+193193+ cr, err := car.NewCarReader(fi)
194194+ if err != nil {
195195+ return err
196196+ }
197197+198198+ for {
199199+ blk, err := cr.Next()
200200+ if err != nil {
201201+ if err == io.EOF {
202202+ break
203203+ }
204204+ return err
205205+ }
206206+207207+ uv.cache[blk.Cid()] = blk
208208+ }
209209+210210+ return nil
211211+}
212212+213213+func (uv *userView) singleRead(ctx context.Context, k cid.Cid, path string, offset int64) (blockformat.Block, error) {
214214+ fi, err := os.Open(path)
215215+ if err != nil {
216216+ return nil, err
217217+ }
218218+ defer fi.Close()
219219+220220+ return doBlockRead(fi, k, offset)
221221+}
222222+223223+func doBlockRead(fi *os.File, k cid.Cid, offset int64) (blockformat.Block, error) {
224224+ seeked, err := fi.Seek(offset, io.SeekStart)
225225+ if err != nil {
226226+ return nil, err
227227+ }
228228+229229+ if seeked != offset {
230230+ return nil, fmt.Errorf("failed to seek to offset (%d != %d)", seeked, offset)
231231+ }
232232+233233+ bufr := bufio.NewReader(fi)
234234+ rcid, data, err := carutil.ReadNode(bufr)
235235+ if err != nil {
236236+ return nil, err
237237+ }
238238+239239+ if rcid != k {
240240+ return nil, fmt.Errorf("mismatch in cid on disk: %s != %s", rcid, k)
241241+ }
242242+243243+ return blocks.NewBlockWithCid(data, rcid)
244244+}
245245+246246+func (uv *userView) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
247247+ return nil, fmt.Errorf("not implemented")
248248+}
249249+250250+func (uv *userView) Put(ctx context.Context, blk blockformat.Block) error {
251251+ return fmt.Errorf("puts not supported to car view blockstores")
252252+}
253253+254254+func (uv *userView) PutMany(ctx context.Context, blks []blockformat.Block) error {
255255+ return fmt.Errorf("puts not supported to car view blockstores")
256256+}
257257+258258+func (uv *userView) DeleteBlock(ctx context.Context, k cid.Cid) error {
259259+ return fmt.Errorf("deletes not supported to car view blockstore")
260260+}
261261+262262+func (uv *userView) GetSize(ctx context.Context, k cid.Cid) (int, error) {
263263+ // TODO: maybe block size should be in the database record...
264264+ blk, err := uv.Get(ctx, k)
265265+ if err != nil {
266266+ return 0, err
267267+ }
268268+269269+ return len(blk.RawData()), nil
270270+}
271271+272272+// subset of blockstore.Blockstore that we actually use here
273273+type minBlockstore interface {
274274+ Get(ctx context.Context, bcid cid.Cid) (blockformat.Block, error)
275275+ Has(ctx context.Context, bcid cid.Cid) (bool, error)
276276+ GetSize(ctx context.Context, bcid cid.Cid) (int, error)
277277+}
278278+279279+type DeltaSession struct {
280280+ blks map[cid.Cid]blockformat.Block
281281+ rmcids map[cid.Cid]bool
282282+ base minBlockstore
283283+ user models.Uid
284284+ baseCid cid.Cid
285285+ seq int
286286+ readonly bool
287287+ cs shardWriter
288288+ lastRev string
289289+}
290290+291291+func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard {
292292+ return cs.lastShardCache.check(user)
293293+}
294294+295295+func (cs *FileCarStore) removeLastShardCache(user models.Uid) {
296296+ cs.lastShardCache.remove(user)
297297+}
298298+299299+func (cs *FileCarStore) putLastShardCache(ls *CarShard) {
300300+ cs.lastShardCache.put(ls)
301301+}
302302+303303+func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) {
304304+ return cs.lastShardCache.get(ctx, user)
305305+}
306306+307307+func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (carstore1.BlockStorage, error) {
308308+ ctx, span := otel.Tracer("carstore2").Start(ctx, "NewSession")
309309+ defer span.End()
310310+311311+ // TODO: ensure that we don't write updates on top of the wrong head
312312+ // this needs to be a compare and swap type operation
313313+ lastShard, err := cs.getLastShard(ctx, user)
314314+ if err != nil {
315315+ return nil, err
316316+ }
317317+318318+ if since != nil && *since != lastShard.Rev {
319319+ return nil, fmt.Errorf("revision mismatch: %s != %s: %w", *since, lastShard.Rev, carstore1.ErrRepoBaseMismatch)
320320+ }
321321+ uv := &userView{
322322+ user: user,
323323+ prefetch: true,
324324+ cache: make(map[cid.Cid]blockformat.Block),
325325+ }
326326+327327+ if lastShard.ID != 0 {
328328+ if err := uv.preloadBlocksFromFile(ctx, lastShard.Path); err != nil {
329329+ return nil, fmt.Errorf("block prefetch failed: %w", err)
330330+ }
331331+ }
332332+333333+ return &DeltaSession{
334334+ blks: make(map[cid.Cid]blockformat.Block),
335335+ base: uv,
336336+ user: user,
337337+ baseCid: lastShard.Root.CID,
338338+ cs: cs,
339339+ seq: lastShard.Seq + 1,
340340+ lastRev: lastShard.Rev,
341341+ }, nil
342342+}
343343+344344+func (cs *FileCarStore) ReadOnlySession(user models.Uid) (carstore1.BlockStorage, error) {
345345+ return &DeltaSession{
346346+ base: &userView{
347347+ user: user,
348348+ prefetch: false,
349349+ cache: make(map[cid.Cid]blockformat.Block),
350350+ },
351351+ readonly: true,
352352+ user: user,
353353+ cs: cs,
354354+ }, nil
355355+}
356356+357357+// TODO: incremental is only ever called true, remove the param
358358+func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, shardOut io.Writer) error {
359359+ ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar")
360360+ defer span.End()
361361+362362+ var earlySeq int
363363+ if sinceRev != "" {
364364+ var err error
365365+ earlySeq, err = cs.meta.SeqForRev(ctx, user, sinceRev)
366366+ if err != nil {
367367+ return err
368368+ }
369369+ }
370370+371371+ shards, err := cs.meta.GetUserShardsDesc(ctx, user, earlySeq)
372372+ if err != nil {
373373+ return err
374374+ }
375375+376376+ // TODO: incremental is only ever called true, so this is fine and we can remove the error check
377377+ if !incremental && earlySeq > 0 {
378378+ // have to do it the ugly way
379379+ return fmt.Errorf("nyi")
380380+ }
381381+382382+ if len(shards) == 0 {
383383+ return fmt.Errorf("no data found for user %d", user)
384384+ }
385385+386386+ // fast path!
387387+ if err := car.WriteHeader(&car.CarHeader{
388388+ Roots: []cid.Cid{shards[0].Root.CID},
389389+ Version: 1,
390390+ }, shardOut); err != nil {
391391+ return err
392392+ }
393393+394394+ for _, sh := range shards {
395395+ if err := cs.writeShardBlocks(ctx, &sh, shardOut); err != nil {
396396+ return err
397397+ }
398398+ }
399399+400400+ return nil
401401+}
402402+403403+// inner loop part of ReadUserCar
404404+// copy shard blocks from disk to Writer
405405+func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, shardOut io.Writer) error {
406406+ ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks")
407407+ defer span.End()
408408+409409+ fi, err := os.Open(sh.Path)
410410+ if err != nil {
411411+ return err
412412+ }
413413+ defer fi.Close()
414414+415415+ _, err = fi.Seek(sh.DataStart, io.SeekStart)
416416+ if err != nil {
417417+ return err
418418+ }
419419+420420+ _, err = io.Copy(shardOut, fi)
421421+ if err != nil {
422422+ return err
423423+ }
424424+425425+ return nil
426426+}
427427+428428+// inner loop part of compactBucket
429429+func (cs *FileCarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error {
430430+ fi, err := os.Open(sh.Path)
431431+ if err != nil {
432432+ return err
433433+ }
434434+ defer fi.Close()
435435+436436+ rr, err := car.NewCarReader(fi)
437437+ if err != nil {
438438+ return fmt.Errorf("opening shard car: %w", err)
439439+ }
440440+441441+ for {
442442+ blk, err := rr.Next()
443443+ if err != nil {
444444+ if err == io.EOF {
445445+ return nil
446446+ }
447447+ return err
448448+ }
449449+450450+ if err := cb(blk); err != nil {
451451+ return err
452452+ }
453453+ }
454454+}
455455+456456+var _ blockstore.Blockstore = (*DeltaSession)(nil)
457457+458458+func (ds *DeltaSession) BaseCid() cid.Cid {
459459+ return ds.baseCid
460460+}
461461+462462+func (ds *DeltaSession) Put(ctx context.Context, b blockformat.Block) error {
463463+ if ds.readonly {
464464+ return fmt.Errorf("cannot write to readonly deltaSession")
465465+ }
466466+ ds.blks[b.Cid()] = b
467467+ return nil
468468+}
469469+470470+func (ds *DeltaSession) PutMany(ctx context.Context, bs []blockformat.Block) error {
471471+ if ds.readonly {
472472+ return fmt.Errorf("cannot write to readonly deltaSession")
473473+ }
474474+475475+ for _, b := range bs {
476476+ ds.blks[b.Cid()] = b
477477+ }
478478+ return nil
479479+}
480480+481481+func (ds *DeltaSession) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
482482+ return nil, fmt.Errorf("AllKeysChan not implemented")
483483+}
484484+485485+func (ds *DeltaSession) DeleteBlock(ctx context.Context, c cid.Cid) error {
486486+ if ds.readonly {
487487+ return fmt.Errorf("cannot write to readonly deltaSession")
488488+ }
489489+490490+ if _, ok := ds.blks[c]; !ok {
491491+ return ipld.ErrNotFound{Cid: c}
492492+ }
493493+494494+ delete(ds.blks, c)
495495+ return nil
496496+}
497497+498498+func (ds *DeltaSession) Get(ctx context.Context, c cid.Cid) (blockformat.Block, error) {
499499+ b, ok := ds.blks[c]
500500+ if ok {
501501+ return b, nil
502502+ }
503503+504504+ return ds.base.Get(ctx, c)
505505+}
506506+507507+func (ds *DeltaSession) Has(ctx context.Context, c cid.Cid) (bool, error) {
508508+ _, ok := ds.blks[c]
509509+ if ok {
510510+ return true, nil
511511+ }
512512+513513+ return ds.base.Has(ctx, c)
514514+}
515515+516516+func (ds *DeltaSession) HashOnRead(hor bool) {
517517+ // noop?
518518+}
519519+520520+func (ds *DeltaSession) GetSize(ctx context.Context, c cid.Cid) (int, error) {
521521+ b, ok := ds.blks[c]
522522+ if ok {
523523+ return len(b.RawData()), nil
524524+ }
525525+526526+ return ds.base.GetSize(ctx, c)
527527+}
528528+529529+func fnameForShard(user models.Uid, seq int) string {
530530+ return fmt.Sprintf("sh-%d-%d", user, seq)
531531+}
532532+533533+func (cs *FileCarStore) dirForUser(user models.Uid) string {
534534+ return cs.rootDirs[int(user)%len(cs.rootDirs)]
535535+}
536536+537537+func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
538538+ // TODO: some overwrite protections
539539+ fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq))
540540+ fi, err := os.Create(fname)
541541+ if err != nil {
542542+ return nil, "", err
543543+ }
544544+545545+ return fi, fname, nil
546546+}
547547+548548+func (cs *FileCarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) {
549549+ _, span := otel.Tracer("carstore").Start(ctx, "writeNewShardFile")
550550+ defer span.End()
551551+552552+ // TODO: some overwrite protections
553553+ fname := filepath.Join(cs.dirForUser(user), fnameForShard(user, seq))
554554+ if err := os.WriteFile(fname, data, 0664); err != nil {
555555+ return "", err
556556+ }
557557+558558+ return fname, nil
559559+}
560560+561561+func (cs *FileCarStore) deleteShardFile(ctx context.Context, sh *CarShard) error {
562562+ return os.Remove(sh.Path)
563563+}
564564+565565+// CloseWithRoot writes all new blocks in a car file to the writer with the
566566+// given cid as the 'root'
567567+func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error) {
568568+ ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot")
569569+ defer span.End()
570570+571571+ if ds.readonly {
572572+ return nil, fmt.Errorf("cannot write to readonly deltaSession")
573573+ }
574574+575575+ return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids)
576576+}
577577+578578+func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) {
579579+ h := &car.CarHeader{
580580+ Roots: []cid.Cid{root},
581581+ Version: 1,
582582+ }
583583+ hb, err := cbor.DumpObject(h)
584584+ if err != nil {
585585+ return 0, err
586586+ }
587587+588588+ hnw, err := carstore.LdWrite(w, hb)
589589+ if err != nil {
590590+ return 0, err
591591+ }
592592+593593+ return hnw, nil
594594+}
595595+596596+// shardWriter.writeNewShard called from inside DeltaSession.CloseWithRoot
597597+type shardWriter interface {
598598+ // writeNewShard stores blocks in `blks` arg and creates a new shard to propagate out to our firehose
599599+ writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error)
600600+}
601601+602602+func blocksToCar(ctx context.Context, root cid.Cid, rev string, blks map[cid.Cid]blockformat.Block) ([]byte, error) {
603603+ buf := new(bytes.Buffer)
604604+ _, err := WriteCarHeader(buf, root)
605605+ if err != nil {
606606+ return nil, fmt.Errorf("failed to write car header: %w", err)
607607+ }
608608+609609+ for k, blk := range blks {
610610+ _, err := carstore.LdWrite(buf, k.Bytes(), blk.RawData())
611611+ if err != nil {
612612+ return nil, fmt.Errorf("failed to write block: %w", err)
613613+ }
614614+ }
615615+616616+ return buf.Bytes(), nil
617617+}
618618+619619+func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) {
620620+621621+ buf := new(bytes.Buffer)
622622+ hnw, err := WriteCarHeader(buf, root)
623623+ if err != nil {
624624+ return nil, fmt.Errorf("failed to write car header: %w", err)
625625+ }
626626+627627+ // TODO: writing these blocks in map traversal order is bad, I believe the
628628+ // optimal ordering will be something like reverse-write-order, but random
629629+ // is definitely not it
630630+631631+ offset := hnw
632632+ //brefs := make([]*blockRef, 0, len(ds.blks))
633633+ brefs := make([]map[string]interface{}, 0, len(blks))
634634+ for k, blk := range blks {
635635+ nw, err := carstore.LdWrite(buf, k.Bytes(), blk.RawData())
636636+ if err != nil {
637637+ return nil, fmt.Errorf("failed to write block: %w", err)
638638+ }
639639+640640+ /*
641641+ brefs = append(brefs, &blockRef{
642642+ Cid: k.String(),
643643+ Offset: offset,
644644+ Shard: shard.ID,
645645+ })
646646+ */
647647+ // adding things to the db by map is the only way to get gorm to not
648648+ // add the 'returning' clause, which costs a lot of time
649649+ brefs = append(brefs, map[string]interface{}{
650650+ "cid": models.DbCID{CID: k},
651651+ "offset": offset,
652652+ })
653653+654654+ offset += nw
655655+ }
656656+657657+ start := time.Now()
658658+ path, err := cs.writeNewShardFile(ctx, user, seq, buf.Bytes())
659659+ if err != nil {
660660+ return nil, fmt.Errorf("failed to write shard file: %w", err)
661661+ }
662662+ writeShardFileDuration.Observe(time.Since(start).Seconds())
663663+664664+ shard := CarShard{
665665+ Root: models.DbCID{CID: root},
666666+ DataStart: hnw,
667667+ Seq: seq,
668668+ Path: path,
669669+ Usr: user,
670670+ Rev: rev,
671671+ }
672672+673673+ start = time.Now()
674674+ if err := cs.putShard(ctx, &shard, brefs, rmcids, false); err != nil {
675675+ return nil, err
676676+ }
677677+ writeShardMetadataDuration.Observe(time.Since(start).Seconds())
678678+679679+ return buf.Bytes(), nil
680680+}
681681+682682+func (cs *FileCarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error {
683683+ ctx, span := otel.Tracer("carstore").Start(ctx, "putShard")
684684+ defer span.End()
685685+686686+ err := cs.meta.PutShardAndRefs(ctx, shard, brefs, rmcids)
687687+ if err != nil {
688688+ return err
689689+ }
690690+691691+ if !nocache {
692692+ cs.putLastShardCache(shard)
693693+ }
694694+695695+ return nil
696696+}
697697+698698+func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipcids map[cid.Cid]bool) (map[cid.Cid]bool, error) {
699699+ ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff")
700700+ defer span.End()
701701+702702+ if !oldroot.Defined() {
703703+ return map[cid.Cid]bool{}, nil
704704+ }
705705+706706+ // walk the entire 'new' portion of the tree, marking all referenced cids as 'keep'
707707+ keepset := make(map[cid.Cid]bool)
708708+ for c := range newcids {
709709+ keepset[c] = true
710710+ oblk, err := bs.Get(ctx, c)
711711+ if err != nil {
712712+ return nil, fmt.Errorf("get failed in new tree: %w", err)
713713+ }
714714+715715+ if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) {
716716+ keepset[lnk] = true
717717+ }); err != nil {
718718+ return nil, err
719719+ }
720720+ }
721721+722722+ if keepset[oldroot] {
723723+ // this should probably never happen, but is technically correct
724724+ return nil, nil
725725+ }
726726+727727+ // next, walk the old tree from the root, recursing on cids *not* in the keepset.
728728+ dropset := make(map[cid.Cid]bool)
729729+ dropset[oldroot] = true
730730+ queue := []cid.Cid{oldroot}
731731+732732+ for len(queue) > 0 {
733733+ c := queue[0]
734734+ queue = queue[1:]
735735+736736+ if skipcids != nil && skipcids[c] {
737737+ continue
738738+ }
739739+740740+ oblk, err := bs.Get(ctx, c)
741741+ if err != nil {
742742+ return nil, fmt.Errorf("get failed in old tree: %w", err)
743743+ }
744744+745745+ if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) {
746746+ if lnk.Prefix().Codec != cid.DagCBOR {
747747+ return
748748+ }
749749+750750+ if !keepset[lnk] {
751751+ dropset[lnk] = true
752752+ queue = append(queue, lnk)
753753+ }
754754+ }); err != nil {
755755+ return nil, err
756756+ }
757757+ }
758758+759759+ return dropset, nil
760760+}
761761+762762+func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, carstore1.BlockStorage, error) {
763763+ ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice")
764764+ defer span.End()
765765+766766+ carr, err := car.NewCarReader(bytes.NewReader(carslice))
767767+ if err != nil {
768768+ return cid.Undef, nil, err
769769+ }
770770+771771+ if len(carr.Header.Roots) != 1 {
772772+ return cid.Undef, nil, fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
773773+ }
774774+775775+ ds, err := cs.NewDeltaSession(ctx, uid, since)
776776+ if err != nil {
777777+ return cid.Undef, nil, fmt.Errorf("new delta session failed: %w", err)
778778+ }
779779+780780+ var cids []cid.Cid
781781+ for {
782782+ blk, err := carr.Next()
783783+ if err != nil {
784784+ if err == io.EOF {
785785+ break
786786+ }
787787+ return cid.Undef, nil, err
788788+ }
789789+790790+ cids = append(cids, blk.Cid())
791791+792792+ if err := ds.Put(ctx, blk); err != nil {
793793+ return cid.Undef, nil, err
794794+ }
795795+ }
796796+797797+ return carr.Header.Roots[0], ds, nil
798798+}
799799+800800+func (ds *DeltaSession) CalcDiff(ctx context.Context, skipcids map[cid.Cid]bool) error {
801801+ rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks, skipcids)
802802+ if err != nil {
803803+ return fmt.Errorf("block diff failed (base=%s,rev=%s): %w", ds.baseCid, ds.lastRev, err)
804804+ }
805805+806806+ ds.rmcids = rmcids
807807+ return nil
808808+}
809809+810810+func (cs *FileCarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) {
811811+ lastShard, err := cs.getLastShard(ctx, user)
812812+ if err != nil {
813813+ return cid.Undef, err
814814+ }
815815+ if lastShard.ID == 0 {
816816+ return cid.Undef, nil
817817+ }
818818+819819+ return lastShard.Root.CID, nil
820820+}
821821+822822+func (cs *FileCarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) {
823823+ lastShard, err := cs.getLastShard(ctx, user)
824824+ if err != nil {
825825+ return "", err
826826+ }
827827+ if lastShard.ID == 0 {
828828+ return "", nil
829829+ }
830830+831831+ return lastShard.Rev, nil
832832+}
833833+834834+func (cs *FileCarStore) Stat(ctx context.Context, usr models.Uid) ([]carstore1.UserStat, error) {
835835+ shards, err := cs.meta.GetUserShards(ctx, usr)
836836+ if err != nil {
837837+ return nil, err
838838+ }
839839+840840+ var out []carstore1.UserStat
841841+ for _, s := range shards {
842842+ out = append(out, carstore1.UserStat{
843843+ Seq: s.Seq,
844844+ Root: s.Root.CID.String(),
845845+ Created: s.CreatedAt,
846846+ })
847847+ }
848848+849849+ return out, nil
850850+}
851851+852852+func (cs *FileCarStore) WipeUserData(ctx context.Context, user models.Uid) error {
853853+ shards, err := cs.meta.GetUserShards(ctx, user)
854854+ if err != nil {
855855+ return err
856856+ }
857857+858858+ if err := cs.deleteShards(ctx, shards); err != nil {
859859+ if !os.IsNotExist(err) {
860860+ return err
861861+ }
862862+ }
863863+864864+ cs.removeLastShardCache(user)
865865+866866+ return nil
867867+}
868868+869869+func (cs *FileCarStore) deleteShards(ctx context.Context, shs []CarShard) error {
870870+ ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards")
871871+ defer span.End()
872872+873873+ deleteSlice := func(ctx context.Context, subs []CarShard) error {
874874+ ids := make([]uint, len(subs))
875875+ for i, sh := range subs {
876876+ ids[i] = sh.ID
877877+ }
878878+879879+ err := cs.meta.DeleteShardsAndRefs(ctx, ids)
880880+ if err != nil {
881881+ return err
882882+ }
883883+884884+ for _, sh := range subs {
885885+ if err := cs.deleteShardFile(ctx, &sh); err != nil {
886886+ if !os.IsNotExist(err) {
887887+ return err
888888+ }
889889+ cs.log.Warn("shard file we tried to delete did not exist", "shard", sh.ID, "path", sh.Path)
890890+ }
891891+ }
892892+893893+ return nil
894894+ }
895895+896896+ chunkSize := 2000
897897+ for i := 0; i < len(shs); i += chunkSize {
898898+ sl := shs[i:]
899899+ if len(sl) > chunkSize {
900900+ sl = sl[:chunkSize]
901901+ }
902902+903903+ if err := deleteSlice(ctx, sl); err != nil {
904904+ return err
905905+ }
906906+ }
907907+908908+ return nil
909909+}
910910+911911+type shardStat struct {
912912+ ID uint
913913+ Dirty int
914914+ Seq int
915915+ Total int
916916+917917+ refs []blockRef
918918+}
919919+920920+func (s shardStat) dirtyFrac() float64 {
921921+ return float64(s.Dirty) / float64(s.Total)
922922+}
923923+924924+func aggrRefs(brefs []blockRef, shards map[uint]CarShard, staleCids map[cid.Cid]bool) []shardStat {
925925+ byId := make(map[uint]*shardStat)
926926+927927+ for _, br := range brefs {
928928+ s, ok := byId[br.Shard]
929929+ if !ok {
930930+ s = &shardStat{
931931+ ID: br.Shard,
932932+ Seq: shards[br.Shard].Seq,
933933+ }
934934+ byId[br.Shard] = s
935935+ }
936936+937937+ s.Total++
938938+ if staleCids[br.Cid.CID] {
939939+ s.Dirty++
940940+ }
941941+942942+ s.refs = append(s.refs, br)
943943+ }
944944+945945+ var out []shardStat
946946+ for _, s := range byId {
947947+ out = append(out, *s)
948948+ }
949949+950950+ sort.Slice(out, func(i, j int) bool {
951951+ return out[i].Seq < out[j].Seq
952952+ })
953953+954954+ return out
955955+}
956956+957957+type compBucket struct {
958958+ shards []shardStat
959959+960960+ cleanBlocks int
961961+ expSize int
962962+}
963963+964964+func (cb *compBucket) shouldCompact() bool {
965965+ if len(cb.shards) == 0 {
966966+ return false
967967+ }
968968+969969+ if len(cb.shards) > 5 {
970970+ return true
971971+ }
972972+973973+ var frac float64
974974+ for _, s := range cb.shards {
975975+ frac += s.dirtyFrac()
976976+ }
977977+ frac /= float64(len(cb.shards))
978978+979979+ if len(cb.shards) > 3 && frac > 0.2 {
980980+ return true
981981+ }
982982+983983+ return frac > 0.4
984984+}
985985+986986+func (cb *compBucket) addShardStat(ss shardStat) {
987987+ cb.cleanBlocks += (ss.Total - ss.Dirty)
988988+ cb.shards = append(cb.shards, ss)
989989+}
990990+991991+func (cb *compBucket) isEmpty() bool {
992992+ return len(cb.shards) == 0
993993+}
994994+995995+func (cs *FileCarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) {
996996+ // TODO: some overwrite protections
997997+ // NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary".
998998+ // This creates "sh-%d-%d%s" with some random stuff in the last position
999999+ fi, err := os.CreateTemp(cs.dirForUser(user), fnameForShard(user, seq))
10001000+ if err != nil {
10011001+ return nil, "", err
10021002+ }
10031003+10041004+ return fi, fi.Name(), nil
10051005+}
10061006+10071007+type CompactionTarget struct {
10081008+ Usr models.Uid
10091009+ NumShards int
10101010+}
10111011+10121012+func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]carstore1.CompactionTarget, error) {
10131013+ ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets")
10141014+ defer span.End()
10151015+10161016+ return cs.meta.GetCompactionTargets(ctx, shardCount)
10171017+}
10181018+10191019+func shardSize(sh *CarShard) (int64, error) {
10201020+ st, err := os.Stat(sh.Path)
10211021+ if err != nil {
10221022+ if os.IsNotExist(err) {
10231023+ slog.Warn("missing shard, return size of zero", "path", sh.Path, "shard", sh.ID, "system", "carstore")
10241024+ return 0, nil
10251025+ }
10261026+ return 0, fmt.Errorf("stat %q: %w", sh.Path, err)
10271027+ }
10281028+10291029+ return st.Size(), nil
10301030+}
10311031+10321032+func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*carstore1.CompactionStats, error) {
10331033+ ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards")
10341034+ defer span.End()
10351035+10361036+ span.SetAttributes(attribute.Int64("user", int64(user)))
10371037+10381038+ shards, err := cs.meta.GetUserShards(ctx, user)
10391039+ if err != nil {
10401040+ return nil, err
10411041+ }
10421042+10431043+ _ = shards
10441044+ return nil, fmt.Errorf("TODO: have to redo all of compaction")
10451045+}
10461046+10471047+func (cs *FileCarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error {
10481048+ ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs")
10491049+ defer span.End()
10501050+10511051+ brByCid := make(map[cid.Cid][]blockRef)
10521052+ for _, br := range brefs {
10531053+ brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br)
10541054+ }
10551055+10561056+ var staleToKeep []cid.Cid
10571057+ for _, sr := range staleRefs {
10581058+ cids, err := sr.getCids()
10591059+ if err != nil {
10601060+ return fmt.Errorf("getCids on staleRef failed (%d): %w", sr.ID, err)
10611061+ }
10621062+10631063+ for _, c := range cids {
10641064+ brs := brByCid[c]
10651065+ del := true
10661066+ for _, br := range brs {
10671067+ if !removedShards[br.Shard] {
10681068+ del = false
10691069+ break
10701070+ }
10711071+ }
10721072+10731073+ if !del {
10741074+ staleToKeep = append(staleToKeep, c)
10751075+ }
10761076+ }
10771077+ }
10781078+10791079+ return cs.meta.SetStaleRef(ctx, uid, staleToKeep)
10801080+}
10811081+10821082+func (cs *FileCarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error {
10831083+ ctx, span := otel.Tracer("carstore").Start(ctx, "compactBucket")
10841084+ defer span.End()
10851085+10861086+ span.SetAttributes(attribute.Int("shards", len(b.shards)))
10871087+10881088+ last := b.shards[len(b.shards)-1]
10891089+ lastsh := shardsById[last.ID]
10901090+ fi, path, err := cs.openNewCompactedShardFile(ctx, user, last.Seq)
10911091+ if err != nil {
10921092+ return fmt.Errorf("opening new file: %w", err)
10931093+ }
10941094+10951095+ defer fi.Close()
10961096+ root := lastsh.Root.CID
10971097+10981098+ hnw, err := WriteCarHeader(fi, root)
10991099+ if err != nil {
11001100+ return err
11011101+ }
11021102+11031103+ offset := hnw
11041104+ var nbrefs []map[string]any
11051105+ written := make(map[cid.Cid]bool)
11061106+ for _, s := range b.shards {
11071107+ sh := shardsById[s.ID]
11081108+ if err := cs.iterateShardBlocks(ctx, &sh, func(blk blockformat.Block) error {
11091109+ if written[blk.Cid()] {
11101110+ return nil
11111111+ }
11121112+11131113+ if keep[blk.Cid()] {
11141114+ nw, err := carstore.LdWrite(fi, blk.Cid().Bytes(), blk.RawData())
11151115+ if err != nil {
11161116+ return fmt.Errorf("failed to write block: %w", err)
11171117+ }
11181118+11191119+ nbrefs = append(nbrefs, map[string]interface{}{
11201120+ "cid": models.DbCID{CID: blk.Cid()},
11211121+ "offset": offset,
11221122+ })
11231123+11241124+ offset += nw
11251125+ written[blk.Cid()] = true
11261126+ }
11271127+ return nil
11281128+ }); err != nil {
11291129+ // If we ever fail to iterate a shard file because its
11301130+ // corrupted, just log an error and skip the shard
11311131+ cs.log.Error("iterating blocks in shard", "shard", s.ID, "err", err, "uid", user)
11321132+ }
11331133+ }
11341134+11351135+ shard := CarShard{
11361136+ Root: models.DbCID{CID: root},
11371137+ DataStart: hnw,
11381138+ Seq: lastsh.Seq,
11391139+ Path: path,
11401140+ Usr: user,
11411141+ Rev: lastsh.Rev,
11421142+ }
11431143+11441144+ if err := cs.putShard(ctx, &shard, nbrefs, nil, true); err != nil {
11451145+ // if writing the shard fails, we should also delete the file
11461146+ _ = fi.Close()
11471147+11481148+ if err2 := os.Remove(fi.Name()); err2 != nil {
11491149+ cs.log.Error("failed to remove shard file after failed db transaction", "path", fi.Name(), "err", err2)
11501150+ }
11511151+11521152+ return err
11531153+ }
11541154+ return nil
11551155+}