this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix control flow on backfill

Jaz f9e61880 1f363b2a

+26 -23
+6 -5
backfill/next/backfill.go
··· 118 118 log := slog.With("component", "backfiller", "name", b.Name, "pds", pds, "repo", repo) 119 119 log.Debug("enqueueing backfill job") 120 120 121 - if err := b.Store.EnqueueJob(ctx, pds, repo); err != nil { 122 - log.Error("failed to enqueue backfill job", "error", err) 123 - return err 124 - } 125 - 126 121 // Check if we already have a backfiller for this PDS 127 122 b.lk.Lock() 128 123 defer b.lk.Unlock() ··· 145 140 b.pdsBackfillers[pds] = pdsBackfiller 146 141 pdsBackfiller.Start() 147 142 } 143 + 144 + if err := b.Store.EnqueueJob(ctx, pds, repo); err != nil { 145 + log.Error("failed to enqueue backfill job", "error", err) 146 + return err 147 + } 148 + 148 149 backfillJobsEnqueued.WithLabelValues(b.Name).Inc() 149 150 log.Debug("backfill job enqueued successfully") 150 151 return nil
+20 -18
backfill/next/gormstore.go
··· 69 69 } 70 70 71 71 type todoJob struct { 72 - pds string 73 - repo string 72 + PDS string 73 + Repo string 74 74 } 75 75 76 76 func (s *Gormstore) loadJobs(ctx context.Context, limit int) error { ··· 100 100 } 101 101 102 102 for _, job := range todoJobs { 103 - if pdsQueue, ok := s.pdsQueues[job.pds]; ok { 103 + if pdsQueue, ok := s.pdsQueues[job.PDS]; ok { 104 104 pdsQueue.qlk.Lock() 105 - pdsQueue.taskQueue = append(pdsQueue.taskQueue, job.repo) 105 + pdsQueue.taskQueue = append(pdsQueue.taskQueue, job.Repo) 106 106 pdsQueue.qlk.Unlock() 107 107 } else { 108 - s.pdsQueues[job.pds] = &queue{ 109 - taskQueue: []string{job.repo}, 108 + s.pdsQueues[job.PDS] = &queue{ 109 + taskQueue: []string{job.Repo}, 110 110 } 111 111 } 112 112 } ··· 140 140 todoJobs = append(todoJobs, moreTodo...) 141 141 } 142 142 143 + // A PDS Queue should always exist for a PDS 144 + // The lock on the PDS queue should be held by the caller 145 + pdsQueue := s.pdsQueues[pds] 143 146 for _, job := range todoJobs { 144 - if pdsQueue, ok := s.pdsQueues[job.pds]; ok { 145 - pdsQueue.qlk.Lock() 146 - pdsQueue.taskQueue = append(pdsQueue.taskQueue, job.repo) 147 - pdsQueue.qlk.Unlock() 148 - } else { 149 - s.pdsQueues[job.pds] = &queue{ 150 - taskQueue: []string{job.repo}, 151 - } 152 - } 147 + pdsQueue.taskQueue = append(pdsQueue.taskQueue, job.Repo) 153 148 } 154 149 155 150 return nil ··· 173 168 } 174 169 175 170 func (s *Gormstore) EnqueueJob(ctx context.Context, pds, repo string) error { 176 - _, err := s.GetOrCreateJob(ctx, pds, repo, StateEnqueued) 171 + j, err := s.GetOrCreateJob(ctx, pds, repo, StateEnqueued) 177 172 if err != nil { 178 173 return err 174 + } 175 + 176 + if j.State() == StateComplete { 177 + return nil // Job is already complete, no need to enqueue again 179 178 } 180 179 181 180 // Add the job to the task queue for the PDS ··· 321 320 func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context, pds string) (Job, error) { 322 321 s.lk.Lock() 323 322 pdsQueue, ok := s.pdsQueues[pds] 324 - s.lk.Unlock() 325 323 if !ok { 326 - return nil, nil 324 + pdsQueue = &queue{ 325 + taskQueue: []string{}, 326 + } 327 + s.pdsQueues[pds] = pdsQueue 327 328 } 329 + s.lk.Unlock() 328 330 pdsQueue.qlk.Lock() 329 331 defer pdsQueue.qlk.Unlock() 330 332