···11+package next
22+33+import (
44+ "context"
55+ "errors"
66+ "fmt"
77+ "log/slog"
88+ "net/http"
99+ "sync"
1010+ "time"
1111+1212+ "github.com/bluesky-social/indigo/repo"
1313+1414+ "github.com/ipfs/go-cid"
1515+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
1616+ "go.opentelemetry.io/otel"
1717+ "golang.org/x/time/rate"
1818+)
1919+2020+// Job is an interface for a backfill job
2121+type Job interface {
2222+ Repo() string
2323+ State() string
2424+ SetState(ctx context.Context, state string) error
2525+ RetryCount() int
2626+}
2727+2828+// Store is an interface for a backfill store which holds Jobs
2929+type Store interface {
3030+ GetJob(ctx context.Context, repo string) (Job, error)
3131+ GetNextEnqueuedJob(ctx context.Context, pds string) (Job, error)
3232+ EnqueueJob(ctx context.Context, pds, repo string) error
3333+}
3434+3535+var (
3636+ // StateEnqueued is the state of a backfill job when it is first created
3737+ StateEnqueued = "enqueued"
3838+ // StateInProgress is the state of a backfill job when it is being processed
3939+ StateInProgress = "in_progress"
4040+ // StateComplete is the state of a backfill job when it has been processed
4141+ StateComplete = "complete"
4242+)
4343+4444+// ErrJobComplete is returned when trying to buffer an op for a job that is complete
4545+var ErrJobComplete = errors.New("job is complete")
4646+4747+// ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist
4848+var ErrJobNotFound = errors.New("job not found")
4949+5050+var tracer = otel.Tracer("backfiller")
5151+5252+// A Backfiller is a generic backfiller that can handle concurrent backfill jobs across multiple PDS instances.
5353+type Backfiller struct {
5454+ Name string
5555+5656+ HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error
5757+ Store Store
5858+5959+ perPDSBackfillConcurrency int
6060+ perPDSSyncsPerSecond int
6161+ globalRecordCreateConcurrency int
6262+6363+ globalRecordCreationLimiter *rate.Limiter
6464+6565+ NSIDFilter string
6666+6767+ pdsBackfillers map[string]*PDSBackfiller
6868+ lk sync.Mutex
6969+7070+ stop chan struct{}
7171+}
7272+7373+type BackfillerOptions struct {
7474+ PerPDSBackfillConcurrency int
7575+ PerPDSSyncsPerSecond int
7676+ GlobalRecordCreateConcurrency int
7777+ NSIDFilter string
7878+ Client *http.Client
7979+}
8080+8181+func DefaultBackfillerOptions() *BackfillerOptions {
8282+ return &BackfillerOptions{
8383+ PerPDSBackfillConcurrency: 10,
8484+ GlobalRecordCreateConcurrency: 100,
8585+ NSIDFilter: "",
8686+ PerPDSSyncsPerSecond: 2,
8787+ }
8888+}
8989+9090+func NewBackfiller(
9191+ name string,
9292+ store Store,
9393+ handleCreate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error,
9494+ opts *BackfillerOptions,
9595+) *Backfiller {
9696+ if opts == nil {
9797+ opts = DefaultBackfillerOptions()
9898+ }
9999+100100+ return &Backfiller{
101101+ Name: name,
102102+ Store: store,
103103+ HandleCreateRecord: handleCreate,
104104+ perPDSBackfillConcurrency: opts.PerPDSBackfillConcurrency,
105105+ perPDSSyncsPerSecond: opts.PerPDSSyncsPerSecond,
106106+ globalRecordCreateConcurrency: opts.GlobalRecordCreateConcurrency,
107107+ globalRecordCreationLimiter: rate.NewLimiter(rate.Limit(opts.GlobalRecordCreateConcurrency), opts.GlobalRecordCreateConcurrency),
108108+ NSIDFilter: opts.NSIDFilter,
109109+ pdsBackfillers: make(map[string]*PDSBackfiller),
110110+ stop: make(chan struct{}),
111111+ }
112112+}
113113+114114+func (b *Backfiller) EnqueueJob(ctx context.Context, pds, repo string) error {
115115+ log := slog.With("component", "backfiller", "name", b.Name, "pds", pds, "repo", repo)
116116+ log.Info("enqueueing backfill job")
117117+118118+ if err := b.Store.EnqueueJob(ctx, pds, repo); err != nil {
119119+ log.Error("failed to enqueue backfill job", "error", err)
120120+ return err
121121+ }
122122+123123+ // Check if we already have a backfiller for this PDS
124124+ b.lk.Lock()
125125+ defer b.lk.Unlock()
126126+ if _, exists := b.pdsBackfillers[pds]; !exists {
127127+ log.Info("creating new PDS backfiller", "pds", pds)
128128+ opts := DefaultPDSBackfillerOptions()
129129+ opts.ParallelBackfills = b.perPDSBackfillConcurrency
130130+ opts.SyncRequestsPerSecond = b.perPDSSyncsPerSecond
131131+ opts.RecordCreateLimiter = b.globalRecordCreationLimiter
132132+ opts.NSIDFilter = b.NSIDFilter
133133+134134+ pdsBackfiller := NewPDSBackfiller(pds, pds, b.Store, b.HandleCreateRecord, opts)
135135+ b.pdsBackfillers[pds] = pdsBackfiller
136136+ pdsBackfiller.Start()
137137+ }
138138+ backfillJobsEnqueued.WithLabelValues(b.Name).Inc()
139139+ log.Info("backfill job enqueued successfully")
140140+ return nil
141141+}
142142+143143+func (b *Backfiller) Shutdown(ctx context.Context) error {
144144+ log := slog.With("component", "backfiller", "name", b.Name)
145145+ log.Info("shutting down backfiller")
146146+ close(b.stop)
147147+ b.lk.Lock()
148148+ defer b.lk.Unlock()
149149+ // Concurrently stop all PDS backfillers
150150+ var wg sync.WaitGroup
151151+ for _, pdsBackfiller := range b.pdsBackfillers {
152152+ wg.Add(1)
153153+ go func(pds *PDSBackfiller) {
154154+ defer wg.Done()
155155+ pds.Stop(ctx)
156156+ }(pdsBackfiller)
157157+ }
158158+ wg.Wait()
159159+ log.Info("all PDS backfillers stopped")
160160+ return nil
161161+}
162162+163163+type PDSBackfiller struct {
164164+ Name string
165165+ Hostname string
166166+ client *http.Client
167167+168168+ HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error
169169+ Store Store
170170+171171+ backfillConcurrency int
172172+ syncLimiter *rate.Limiter
173173+174174+ recordCreateConcurrency int
175175+ recordCreateLimiter *rate.Limiter
176176+177177+ NSIDFilter string
178178+179179+ wg sync.WaitGroup
180180+ stop chan struct{}
181181+}
182182+183183+type PDSBackfillerOptions struct {
184184+ ParallelBackfills int
185185+ ParallelRecordCreates int
186186+ RecordCreateLimiter *rate.Limiter
187187+ NSIDFilter string
188188+ SyncRequestsPerSecond int
189189+ Client *http.Client
190190+}
191191+192192+func DefaultPDSBackfillerOptions() *PDSBackfillerOptions {
193193+ return &PDSBackfillerOptions{
194194+ ParallelBackfills: 10,
195195+ ParallelRecordCreates: 100,
196196+ RecordCreateLimiter: rate.NewLimiter(rate.Limit(100), 100),
197197+ NSIDFilter: "",
198198+ SyncRequestsPerSecond: 2,
199199+ Client: &http.Client{
200200+ Transport: otelhttp.NewTransport(http.DefaultTransport),
201201+ Timeout: 600 * time.Second,
202202+ },
203203+ }
204204+}
205205+206206+// NewPDSBackfiller creates a new backfiller for a single PDS instance
207207+func NewPDSBackfiller(
208208+ name string,
209209+ hostname string,
210210+ store Store,
211211+ handleCreate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error,
212212+ opts *PDSBackfillerOptions,
213213+) *PDSBackfiller {
214214+ if opts == nil {
215215+ opts = DefaultPDSBackfillerOptions()
216216+ }
217217+218218+ return &PDSBackfiller{
219219+ Name: name,
220220+ Hostname: hostname,
221221+ Store: store,
222222+ HandleCreateRecord: handleCreate,
223223+ backfillConcurrency: opts.ParallelBackfills,
224224+ recordCreateConcurrency: opts.ParallelRecordCreates,
225225+ NSIDFilter: opts.NSIDFilter,
226226+ syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), opts.SyncRequestsPerSecond),
227227+ recordCreateLimiter: opts.RecordCreateLimiter,
228228+ stop: make(chan struct{}),
229229+ client: opts.Client,
230230+ wg: sync.WaitGroup{},
231231+ }
232232+}
233233+234234+// Start starts the backfill processor routine
235235+func (b *PDSBackfiller) Start() {
236236+ ctx := context.Background()
237237+238238+ log := slog.With("component", "backfiller", "name", b.Name, "hostname", b.Hostname)
239239+ log.Info("starting backfill processor for PDS", "hostname", b.Hostname)
240240+241241+ // Start a producer to enqueue jobs
242242+ jobs := make(chan Job)
243243+244244+ b.wg.Add(1)
245245+ go func() {
246246+ defer b.wg.Done()
247247+ defer close(jobs)
248248+ log := log.With("subcomponent", "producer")
249249+ for {
250250+ select {
251251+ case <-b.stop:
252252+ log.Info("stopping backfill producer")
253253+ return
254254+ default:
255255+ }
256256+257257+ // Get the next job
258258+ job, err := b.Store.GetNextEnqueuedJob(ctx, b.Hostname)
259259+ if err != nil {
260260+ log.Error("failed to get next enqueued job", "error", err)
261261+ time.Sleep(1 * time.Second)
262262+ continue
263263+ } else if job == nil {
264264+ time.Sleep(1 * time.Second)
265265+ continue
266266+ }
267267+ jobs <- job
268268+ }
269269+ }()
270270+271271+ // Start the worker processes
272272+ for i := 0; i < b.backfillConcurrency; i++ {
273273+ b.wg.Add(1)
274274+ go func() {
275275+ defer b.wg.Done()
276276+ log := log.With("subcomponent", "worker", "worker_id", i)
277277+ for job := range jobs {
278278+ select {
279279+ case <-b.stop:
280280+ log.Info("stopping backfill worker")
281281+ return
282282+ default:
283283+ }
284284+285285+ log := log.With("job", job.Repo(), "state", job.State())
286286+ log.Info("processing backfill job")
287287+288288+ if err := job.SetState(ctx, StateInProgress); err != nil {
289289+ log.Error("failed to set job state to in_progress", "error", err)
290290+ continue
291291+ }
292292+293293+ newState, err := b.BackfillRepo(ctx, job)
294294+ if err != nil {
295295+ log.Error("failed to backfill repo", "error", err)
296296+ } else {
297297+ log.Info("backfill job completed successfully", "new_state", newState)
298298+ }
299299+300300+ if err := job.SetState(ctx, newState); err != nil {
301301+ log.Error("failed to set job completion state", "error", err)
302302+ continue
303303+ }
304304+305305+ backfillJobsProcessed.WithLabelValues(b.Name).Inc()
306306+ }
307307+ }()
308308+ }
309309+}
310310+311311+// Stop stops the backfill processor
312312+func (b *PDSBackfiller) Stop(ctx context.Context) {
313313+ log := slog.With("source", "backfiller", "name", b.Name, "hostname", b.Hostname)
314314+ log.Info("stopping PDS backfiller")
315315+ close(b.stop)
316316+ b.wg.Wait()
317317+ log.Info("PDS backfiller stopped")
318318+}
319319+320320+type recordQueueItem struct {
321321+ recordPath string
322322+ nodeCid cid.Cid
323323+}
324324+325325+type recordResult struct {
326326+ recordPath string
327327+ err error
328328+}
329329+330330+type FetchRepoError struct {
331331+ StatusCode int
332332+ Status string
333333+}
334334+335335+func (e *FetchRepoError) Error() string {
336336+ reason := "unknown error"
337337+ if e.StatusCode == http.StatusBadRequest {
338338+ reason = "repo not found"
339339+ } else {
340340+ reason = e.Status
341341+ }
342342+ return fmt.Sprintf("failed to get repo: %s (%d)", reason, e.StatusCode)
343343+}
344344+345345+// BackfillRepo backfills a repo
346346+func (b *PDSBackfiller) BackfillRepo(ctx context.Context, job Job) (string, error) {
347347+ ctx, span := tracer.Start(ctx, "BackfillRepo")
348348+ defer span.End()
349349+350350+ start := time.Now()
351351+352352+ repoDID := job.Repo()
353353+354354+ log := slog.With("source", "backfiller_backfill_repo", "repo", repoDID)
355355+ if job.RetryCount() > 0 {
356356+ log = log.With("retry_count", job.RetryCount())
357357+ }
358358+ log.Info(fmt.Sprintf("processing backfill for %s", repoDID))
359359+360360+ r, err := b.fetchRepo(ctx, repoDID, b.Hostname)
361361+ if err != nil {
362362+ slog.Warn("repo CAR fetch from PDS failed", "did", repoDID, "pds", b.Hostname, "err", err)
363363+ rfe, ok := err.(*FetchRepoError)
364364+ if ok {
365365+ return fmt.Sprintf("failed to fetch repo CAR from PDS (http %d:%s)", rfe.StatusCode, rfe.Status), err
366366+ }
367367+ return "failed to fetch repo CAR from PDS", err
368368+ }
369369+370370+ numRecords := 0
371371+ numRoutines := b.recordCreateConcurrency
372372+ recordQueue := make(chan recordQueueItem, numRoutines)
373373+ recordResults := make(chan recordResult, numRoutines)
374374+375375+ // Producer routine
376376+ go func() {
377377+ defer close(recordQueue)
378378+ if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
379379+ numRecords++
380380+ recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid}
381381+ return nil
382382+ }); err != nil {
383383+ log.Error("failed to iterate records in repo", "err", err)
384384+ }
385385+ }()
386386+387387+ rev := r.SignedCommit().Rev
388388+389389+ // Consumer routines
390390+ wg := sync.WaitGroup{}
391391+ for i := 0; i < numRoutines; i++ {
392392+ wg.Add(1)
393393+ go func() {
394394+ defer wg.Done()
395395+ for item := range recordQueue {
396396+ blk, err := r.Blockstore().Get(ctx, item.nodeCid)
397397+ if err != nil {
398398+ recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)}
399399+ continue
400400+ }
401401+402402+ raw := blk.RawData()
403403+404404+ if err := b.recordCreateLimiter.Wait(ctx); err != nil {
405405+ recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to wait for record create limiter: %w", err)}
406406+ break
407407+ }
408408+409409+ err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid)
410410+ if err != nil {
411411+ recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)}
412412+ continue
413413+ }
414414+415415+ backfillRecordsProcessed.WithLabelValues(b.Name).Inc()
416416+ recordResults <- recordResult{recordPath: item.recordPath, err: err}
417417+ }
418418+ }()
419419+ }
420420+421421+ resultWG := sync.WaitGroup{}
422422+ resultWG.Add(1)
423423+ // Handle results
424424+ go func() {
425425+ defer resultWG.Done()
426426+ for result := range recordResults {
427427+ if result.err != nil {
428428+ log.Error("Error processing record", "record", result.recordPath, "error", result.err)
429429+ }
430430+ }
431431+ }()
432432+433433+ wg.Wait()
434434+ close(recordResults)
435435+ resultWG.Wait()
436436+437437+ log.Info("backfill complete",
438438+ "records_backfilled", numRecords,
439439+ "duration", time.Since(start),
440440+ )
441441+442442+ return StateComplete, nil
443443+}
444444+445445+// Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo
446446+func (b *PDSBackfiller) fetchRepo(ctx context.Context, did, host string) (*repo.Repo, error) {
447447+ url := fmt.Sprintf("https://%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did)
448448+449449+ req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
450450+ if err != nil {
451451+ return nil, fmt.Errorf("failed to create request: %w", err)
452452+ }
453453+454454+ req.Header.Set("Accept", "application/vnd.ipld.car")
455455+ req.Header.Set("User-Agent", fmt.Sprintf("atproto-backfill-%s/2.0.0", b.Name))
456456+457457+ b.syncLimiter.Wait(ctx)
458458+459459+ resp, err := b.client.Do(req)
460460+ if err != nil {
461461+ return nil, fmt.Errorf("failed to send request: %w", err)
462462+ }
463463+464464+ if resp.StatusCode != http.StatusOK {
465465+ return nil, &FetchRepoError{
466466+ StatusCode: resp.StatusCode,
467467+ Status: resp.Status,
468468+ }
469469+ }
470470+471471+ instrumentedReader := instrumentedReader{
472472+ source: resp.Body,
473473+ counter: backfillBytesProcessed.WithLabelValues(b.Name),
474474+ }
475475+476476+ defer instrumentedReader.Close()
477477+478478+ repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
479479+ if err != nil {
480480+ return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err)
481481+ }
482482+ return repo, nil
483483+}
+374
backfill/next/gormstore.go
···11+package next
22+33+import (
44+ "context"
55+ "errors"
66+ "fmt"
77+ "strings"
88+ "sync"
99+ "time"
1010+1111+ "gorm.io/gorm"
1212+)
1313+1414+type Gormjob struct {
1515+ repo string
1616+ pds string
1717+ state string
1818+ rev string
1919+2020+ lk sync.Mutex
2121+2222+ dbj *GormDBJob
2323+ db *gorm.DB
2424+2525+ createdAt time.Time
2626+ updatedAt time.Time
2727+2828+ retryCount int
2929+ retryAfter *time.Time
3030+}
3131+3232+type GormDBJob struct {
3333+ gorm.Model
3434+ Repo string `gorm:"unique;index"`
3535+ PDS string `gorm:"index;index:enqueued_pds_job_idx;index:retryable_pds_job_idx"`
3636+ State string `gorm:"index:enqueued_pds_job_idx,where:state = 'enqueued';index:retryable_pds_job_idx,where:state like 'failed%'"`
3737+ Rev string
3838+ RetryCount int
3939+ RetryAfter *time.Time `gorm:"index:retryable_pds_job_idx,sort:desc"`
4040+}
4141+4242+type queue struct {
4343+ qlk sync.Mutex
4444+ taskQueue []string
4545+}
4646+4747+// Gormstore is a gorm-backed implementation of the Backfill Store interface
4848+type Gormstore struct {
4949+ lk sync.RWMutex
5050+ jobs map[string]*Gormjob
5151+5252+ pdsQueues map[string]*queue
5353+5454+ db *gorm.DB
5555+}
5656+5757+func NewGormstore(db *gorm.DB) *Gormstore {
5858+ return &Gormstore{
5959+ jobs: make(map[string]*Gormjob),
6060+ pdsQueues: make(map[string]*queue),
6161+ db: db,
6262+ }
6363+}
6464+6565+func (s *Gormstore) LoadJobs(ctx context.Context) error {
6666+ s.lk.Lock()
6767+ defer s.lk.Unlock()
6868+ return s.loadJobs(ctx, 20_000)
6969+}
7070+7171+type todoJob struct {
7272+ pds string
7373+ repo string
7474+}
7575+7676+func (s *Gormstore) loadJobs(ctx context.Context, limit int) error {
7777+ enqueuedIndexClause := ""
7878+ retryableIndexClause := ""
7979+8080+ // If the DB is a SQLite DB, we can use INDEXED BY to speed up the query
8181+ if s.db.Dialector.Name() == "sqlite" {
8282+ enqueuedIndexClause = "INDEXED BY enqueued_pds_job_idx"
8383+ retryableIndexClause = "INDEXED BY retryable_pds_job_idx"
8484+ }
8585+8686+ enqueuedSelect := fmt.Sprintf(`SELECT pds, repo FROM gorm_db_jobs %s WHERE state = 'enqueued' LIMIT ?`, enqueuedIndexClause)
8787+ retryableSelect := fmt.Sprintf(`SELECT pds, repo FROM gorm_db_jobs %s WHERE state like 'failed%%' AND (retry_after = NULL OR retry_after < ?) LIMIT ?`, retryableIndexClause)
8888+8989+ todoJobs := make([]todoJob, 0, limit)
9090+ if err := s.db.Raw(enqueuedSelect, limit).Scan(&todoJobs).Error; err != nil {
9191+ return err
9292+ }
9393+9494+ if len(todoJobs) < limit {
9595+ moreTodo := make([]todoJob, 0, limit-len(todoJobs))
9696+ if err := s.db.Raw(retryableSelect, time.Now(), limit-len(todoJobs)).Scan(&moreTodo).Error; err != nil {
9797+ return err
9898+ }
9999+ todoJobs = append(todoJobs, moreTodo...)
100100+ }
101101+102102+ for _, job := range todoJobs {
103103+ if pdsQueue, ok := s.pdsQueues[job.pds]; ok {
104104+ pdsQueue.qlk.Lock()
105105+ pdsQueue.taskQueue = append(pdsQueue.taskQueue, job.repo)
106106+ pdsQueue.qlk.Unlock()
107107+ } else {
108108+ s.pdsQueues[job.pds] = &queue{
109109+ taskQueue: []string{job.repo},
110110+ }
111111+ }
112112+ }
113113+114114+ return nil
115115+}
116116+117117+func (s *Gormstore) GetOrCreateJob(ctx context.Context, pds, repo, state string) (Job, error) {
118118+ j, err := s.getJob(ctx, repo)
119119+ if err == nil {
120120+ return j, nil
121121+ }
122122+123123+ if !errors.Is(err, ErrJobNotFound) {
124124+ return nil, err
125125+ }
126126+127127+ if err := s.createJobForRepo(pds, repo, state); err != nil {
128128+ return nil, err
129129+ }
130130+131131+ return s.getJob(ctx, repo)
132132+}
133133+134134+func (s *Gormstore) EnqueueJob(ctx context.Context, pds, repo string) error {
135135+ _, err := s.GetOrCreateJob(ctx, pds, repo, StateEnqueued)
136136+ if err != nil {
137137+ return err
138138+ }
139139+140140+ // Add the job to the task queue for the PDS
141141+ s.lk.Lock()
142142+ pdsQueue, ok := s.pdsQueues[pds]
143143+ if !ok {
144144+ pdsQueue = &queue{
145145+ taskQueue: []string{repo},
146146+ }
147147+ s.pdsQueues[pds] = pdsQueue
148148+ s.lk.Unlock()
149149+ } else {
150150+ s.lk.Unlock()
151151+ pdsQueue.qlk.Lock()
152152+ pdsQueue.taskQueue = append(pdsQueue.taskQueue, repo)
153153+ pdsQueue.qlk.Unlock()
154154+ }
155155+156156+ return nil
157157+}
158158+159159+func (s *Gormstore) EnqueueJobWithState(ctx context.Context, pds, repo, state string) error {
160160+ _, err := s.GetOrCreateJob(ctx, pds, repo, state)
161161+ if err != nil {
162162+ return err
163163+ }
164164+165165+ // Add the job to the task queue for the PDS
166166+ s.lk.Lock()
167167+ pdsQueue, ok := s.pdsQueues[pds]
168168+ if !ok {
169169+ pdsQueue = &queue{
170170+ taskQueue: []string{repo},
171171+ }
172172+ s.pdsQueues[pds] = pdsQueue
173173+ s.lk.Unlock()
174174+ } else {
175175+ s.lk.Unlock()
176176+ pdsQueue.qlk.Lock()
177177+ pdsQueue.taskQueue = append(pdsQueue.taskQueue, repo)
178178+ pdsQueue.qlk.Unlock()
179179+ }
180180+181181+ return nil
182182+}
183183+184184+func (s *Gormstore) createJobForRepo(pds, repo, state string) error {
185185+ dbj := &GormDBJob{
186186+ Repo: repo,
187187+ PDS: pds,
188188+ State: state,
189189+ }
190190+ if err := s.db.Create(dbj).Error; err != nil {
191191+ if errors.Is(err, gorm.ErrDuplicatedKey) {
192192+ return nil
193193+ }
194194+ return err
195195+ }
196196+197197+ s.lk.Lock()
198198+ defer s.lk.Unlock()
199199+200200+ // Convert it to an in-memory job
201201+ if _, ok := s.jobs[repo]; ok {
202202+ // The DB create should have errored if the job already existed, but just in case
203203+ return fmt.Errorf("job already exists for repo %s", repo)
204204+ }
205205+206206+ j := &Gormjob{
207207+ repo: repo,
208208+ pds: pds,
209209+ createdAt: time.Now(),
210210+ updatedAt: time.Now(),
211211+ state: state,
212212+213213+ dbj: dbj,
214214+ db: s.db,
215215+ }
216216+ s.jobs[repo] = j
217217+218218+ return nil
219219+}
220220+221221+func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error) {
222222+ return s.getJob(ctx, repo)
223223+}
224224+225225+func (s *Gormstore) getJob(ctx context.Context, repo string) (*Gormjob, error) {
226226+ cj := s.checkJobCache(ctx, repo)
227227+ if cj != nil {
228228+ return cj, nil
229229+ }
230230+231231+ return s.loadJob(ctx, repo)
232232+}
233233+234234+func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) {
235235+ var dbj GormDBJob
236236+ if err := s.db.Find(&dbj, "repo = ?", repo).Error; err != nil {
237237+ return nil, err
238238+ }
239239+240240+ if dbj.ID == 0 {
241241+ return nil, ErrJobNotFound
242242+ }
243243+244244+ j := &Gormjob{
245245+ repo: dbj.Repo,
246246+ pds: dbj.PDS,
247247+ state: dbj.State,
248248+ rev: dbj.Rev,
249249+ createdAt: dbj.CreatedAt,
250250+ updatedAt: dbj.UpdatedAt,
251251+252252+ dbj: &dbj,
253253+ db: s.db,
254254+255255+ retryCount: dbj.RetryCount,
256256+ retryAfter: dbj.RetryAfter,
257257+ }
258258+ s.lk.Lock()
259259+ defer s.lk.Unlock()
260260+ // would imply a race condition
261261+ exist, ok := s.jobs[repo]
262262+ if ok {
263263+ return exist, nil
264264+ }
265265+ s.jobs[repo] = j
266266+ return j, nil
267267+}
268268+269269+func (s *Gormstore) checkJobCache(ctx context.Context, repo string) *Gormjob {
270270+ s.lk.RLock()
271271+ defer s.lk.RUnlock()
272272+273273+ j, ok := s.jobs[repo]
274274+ if !ok || j == nil {
275275+ return nil
276276+ }
277277+ return j
278278+}
279279+280280+func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context, pds string) (Job, error) {
281281+ s.lk.Lock()
282282+ pdsQueue, ok := s.pdsQueues[pds]
283283+ s.lk.Unlock()
284284+ if !ok {
285285+ return nil, nil
286286+ }
287287+ pdsQueue.qlk.Lock()
288288+ defer pdsQueue.qlk.Unlock()
289289+290290+ if len(pdsQueue.taskQueue) == 0 {
291291+ if err := s.loadJobs(ctx, 1000); err != nil {
292292+ return nil, err
293293+ }
294294+295295+ if len(pdsQueue.taskQueue) == 0 {
296296+ return nil, nil
297297+ }
298298+ }
299299+300300+ for len(pdsQueue.taskQueue) > 0 {
301301+ first := pdsQueue.taskQueue[0]
302302+ pdsQueue.taskQueue = pdsQueue.taskQueue[1:]
303303+304304+ j, err := s.getJob(ctx, first)
305305+ if err != nil {
306306+ return nil, err
307307+ }
308308+309309+ shouldRetry := strings.HasPrefix(j.State(), "failed") && j.retryAfter != nil && time.Now().After(*j.retryAfter)
310310+311311+ if j.State() == StateEnqueued || shouldRetry {
312312+ return j, nil
313313+ }
314314+ }
315315+ return nil, nil
316316+}
317317+318318+func (j *Gormjob) Repo() string {
319319+ return j.repo
320320+}
321321+322322+func (j *Gormjob) State() string {
323323+ j.lk.Lock()
324324+ defer j.lk.Unlock()
325325+326326+ return j.state
327327+}
328328+329329+// MaxRetries is the maximum number of times to retry a backfill job
330330+var MaxRetries = 10
331331+332332+func computeExponentialBackoff(attempt int) time.Duration {
333333+ return time.Duration(1<<uint(attempt)) * 10 * time.Second
334334+}
335335+336336+func (j *Gormjob) SetState(ctx context.Context, state string) error {
337337+ j.lk.Lock()
338338+ defer j.lk.Unlock()
339339+340340+ j.state = state
341341+ j.updatedAt = time.Now()
342342+343343+ if strings.HasPrefix(state, "failed") {
344344+ if j.retryCount < MaxRetries {
345345+ next := time.Now().Add(computeExponentialBackoff(j.retryCount))
346346+ j.retryAfter = &next
347347+ j.retryCount++
348348+ } else {
349349+ j.retryAfter = nil
350350+ }
351351+ }
352352+353353+ // Persist the job to the database
354354+ j.dbj.State = state
355355+ return j.db.Save(j.dbj).Error
356356+}
357357+358358+func (j *Gormjob) RetryCount() int {
359359+ j.lk.Lock()
360360+ defer j.lk.Unlock()
361361+ return j.retryCount
362362+}
363363+364364+func (s *Gormstore) PurgeRepo(ctx context.Context, repo string) error {
365365+ if err := s.db.Exec("DELETE FROM gorm_db_jobs WHERE repo = ?", repo).Error; err != nil {
366366+ return err
367367+ }
368368+369369+ s.lk.Lock()
370370+ defer s.lk.Unlock()
371371+ delete(s.jobs, repo)
372372+373373+ return nil
374374+}
+26
backfill/next/metrics.go
···11+package next
22+33+import (
44+ "github.com/prometheus/client_golang/prometheus"
55+ "github.com/prometheus/client_golang/prometheus/promauto"
66+)
77+88+var backfillJobsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{
99+ Name: "backfill_jobs_enqueued_total",
1010+ Help: "The total number of backfill jobs enqueued",
1111+}, []string{"backfiller_name"})
1212+1313+var backfillJobsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
1414+ Name: "backfill_jobs_processed_total",
1515+ Help: "The total number of backfill jobs processed",
1616+}, []string{"backfiller_name"})
1717+1818+var backfillRecordsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
1919+ Name: "backfill_records_processed_total",
2020+ Help: "The total number of backfill records processed",
2121+}, []string{"backfiller_name"})
2222+2323+var backfillBytesProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
2424+ Name: "backfill_bytes_processed_total",
2525+ Help: "The total number of backfill bytes processed",
2626+}, []string{"backfiller_name"})
+33
backfill/next/util.go
···11+package next
22+33+import (
44+ "io"
55+66+ "github.com/prometheus/client_golang/prometheus"
77+)
88+99+type instrumentedReader struct {
1010+ source io.ReadCloser
1111+ counter prometheus.Counter
1212+}
1313+1414+func (r instrumentedReader) Read(b []byte) (int, error) {
1515+ n, err := r.source.Read(b)
1616+ r.counter.Add(float64(n))
1717+ return n, err
1818+}
1919+2020+func (r instrumentedReader) Close() error {
2121+ var buf [32]byte
2222+ var n int
2323+ var err error
2424+ for err == nil {
2525+ n, err = r.source.Read(buf[:])
2626+ r.counter.Add(float64(n))
2727+ }
2828+ closeerr := r.source.Close()
2929+ if err != nil && err != io.EOF {
3030+ return err
3131+ }
3232+ return closeerr
3333+}