this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Make the backfiller aware of revs and fill gaps (#454)

authored by

Whyrusleeping and committed by
GitHub
a857fdc1 18ae508a

+463 -258
+138 -8
backfill/backfill.go
··· 1 1 package backfill 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "errors" 6 7 "fmt" ··· 9 10 "sync" 10 11 "time" 11 12 13 + "github.com/bluesky-social/indigo/api/atproto" 12 14 // Blank import to register types for CBORGEN 13 15 _ "github.com/bluesky-social/indigo/api/bsky" 14 16 lexutil "github.com/bluesky-social/indigo/lex/util" ··· 25 27 type Job interface { 26 28 Repo() string 27 29 State() string 30 + Rev() string 28 31 SetState(ctx context.Context, state string) error 32 + SetRev(ctx context.Context, rev string) error 33 + RetryCount() int 29 34 35 + BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) 30 36 // FlushBufferedOps calls the given callback for each buffered operation 31 37 // Once done it clears the buffer and marks the job as "complete" 32 38 // Allowing the Job interface to abstract away the details of how buffered 33 39 // operations are stored and/or locked 34 - FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error 40 + FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error 35 41 36 42 ClearBufferedOps(ctx context.Context) error 37 43 } ··· 40 46 type Store interface { 41 47 // BufferOp buffers an operation for a job and returns true if the operation was buffered 42 48 // If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete) 43 - BufferOp(ctx context.Context, repo string, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) 44 49 GetJob(ctx context.Context, repo string) (Job, error) 45 50 GetNextEnqueuedJob(ctx context.Context) (Job, error) 51 + UpdateRev(ctx context.Context, repo, rev string) error 52 + 53 + EnqueueJob(ctx context.Context, repo string) error 46 54 } 47 55 48 56 // Backfiller is a struct which handles backfilling a repo 49 57 type Backfiller struct { 50 58 Name string 51 - HandleCreateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error 52 - HandleUpdateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error 59 + HandleCreateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error 60 + HandleUpdateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error 53 61 HandleDeleteRecord func(ctx context.Context, repo string, path string) error 54 62 Store Store 55 63 ··· 84 92 85 93 // ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist 86 94 var ErrJobNotFound = errors.New("job not found") 95 + 96 + // ErrEventGap is returned when an event is received with a since that doesn't match the current rev 97 + var ErrEventGap = fmt.Errorf("buffered event revs did not line up") 87 98 88 99 var tracer = otel.Tracer("backfiller") 89 100 ··· 109 120 func NewBackfiller( 110 121 name string, 111 122 store Store, 112 - handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, 113 - handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error, 123 + handleCreate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, 124 + handleUpdate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, 114 125 handleDelete func(ctx context.Context, repo string, path string) error, 115 126 opts *BackfillOptions, 116 127 ) *Backfiller { ··· 199 210 200 211 // Flush buffered operations, clear the buffer, and mark the job as "complete" 201 212 // Clearning and marking are handled by the job interface 202 - err := job.FlushBufferedOps(ctx, func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { 213 + err := job.FlushBufferedOps(ctx, func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { 203 214 switch repomgr.EventKind(kind) { 204 215 case repomgr.EvtKindCreateRecord: 205 216 err := b.HandleCreateRecord(ctx, repo, path, rec, cid) ··· 223 234 }) 224 235 if err != nil { 225 236 log.Error("failed to flush buffered ops", "error", err) 237 + if errors.Is(err, ErrEventGap) { 238 + if sserr := job.SetState(ctx, StateEnqueued); sserr != nil { 239 + log.Error("failed to reset job state after failed buffer flush", "error", sserr) 240 + } 241 + // TODO: need to re-queue this job for later 242 + return processed 243 + } 226 244 } 227 245 228 246 // Mark the job as "complete" ··· 254 272 repoDid := job.Repo() 255 273 256 274 log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) 275 + if job.RetryCount() > 0 { 276 + log = log.With("retry_count", job.RetryCount()) 277 + } 257 278 log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) 258 279 259 280 url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) 260 281 282 + if job.Rev() != "" { 283 + url = url + fmt.Sprintf("&since=%s", job.Rev()) 284 + } 285 + 261 286 // GET and CAR decode the body 262 287 client := &http.Client{ 263 288 Transport: otelhttp.NewTransport(http.DefaultTransport), ··· 374 399 continue 375 400 } 376 401 377 - err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, &recM, &item.nodeCid) 402 + err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, recM, &item.nodeCid) 378 403 if err != nil { 379 404 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} 380 405 continue ··· 402 427 close(recordResults) 403 428 resultWG.Wait() 404 429 430 + if err := job.SetRev(ctx, r.SignedCommit().Rev); err != nil { 431 + log.Error("failed to update rev after backfilling repo", "err", err) 432 + } 433 + 405 434 // Process buffered operations, marking the job as "complete" when done 406 435 numProcessed := b.FlushBuffer(ctx, job) 407 436 ··· 411 440 "duration", time.Since(start), 412 441 ) 413 442 } 443 + 444 + func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { 445 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 446 + if err != nil { 447 + return fmt.Errorf("failed to read event repo: %w", err) 448 + } 449 + 450 + var ops []*bufferedOp 451 + for _, op := range evt.Ops { 452 + switch op.Action { 453 + case "create", "update": 454 + cc, rec, err := r.GetRecord(ctx, op.Path) 455 + if err != nil { 456 + return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err) 457 + } 458 + 459 + ops = append(ops, &bufferedOp{ 460 + kind: op.Action, 461 + path: op.Path, 462 + rec: rec, 463 + cid: &cc, 464 + }) 465 + case "delete": 466 + ops = append(ops, &bufferedOp{ 467 + kind: op.Action, 468 + path: op.Path, 469 + }) 470 + default: 471 + return fmt.Errorf("invalid op action: %q", op.Action) 472 + } 473 + } 474 + 475 + buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops) 476 + if err != nil { 477 + return fmt.Errorf("buffer ops failed: %w", err) 478 + } 479 + 480 + if buffered { 481 + return nil 482 + } 483 + 484 + for _, op := range ops { 485 + switch op.kind { 486 + case "create": 487 + if err := bf.HandleCreateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { 488 + return fmt.Errorf("create record failed: %w", err) 489 + } 490 + case "update": 491 + if err := bf.HandleUpdateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { 492 + return fmt.Errorf("update record failed: %w", err) 493 + } 494 + case "delete": 495 + if err := bf.HandleDeleteRecord(ctx, evt.Repo, op.path); err != nil { 496 + return fmt.Errorf("delete record failed: %w", err) 497 + } 498 + } 499 + } 500 + 501 + if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { 502 + return fmt.Errorf("failed to update rev: %w", err) 503 + } 504 + 505 + return nil 506 + } 507 + 508 + func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { 509 + return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{{ 510 + path: path, 511 + kind: kind, 512 + rec: rec, 513 + cid: cid, 514 + }}) 515 + } 516 + 517 + func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*bufferedOp) (bool, error) { 518 + j, err := bf.Store.GetJob(ctx, repo) 519 + if err != nil { 520 + if !errors.Is(err, ErrJobNotFound) { 521 + return false, err 522 + } 523 + if qerr := bf.Store.EnqueueJob(ctx, repo); qerr != nil { 524 + return false, fmt.Errorf("failed to enqueue job for unknown repo: %w", qerr) 525 + } 526 + 527 + nj, err := bf.Store.GetJob(ctx, repo) 528 + if err != nil { 529 + return false, err 530 + } 531 + 532 + j = nj 533 + } 534 + 535 + return j.BufferOps(ctx, since, rev, ops) 536 + } 537 + 538 + // MaxRetries is the maximum number of times to retry a backfill job 539 + var MaxRetries = 10 540 + 541 + func computeExponentialBackoff(attempt int) time.Duration { 542 + return time.Duration(1<<uint(attempt)) * 10 * time.Second 543 + }
+20 -15
backfill/backfill_test.go
··· 5 5 "log/slog" 6 6 "sync" 7 7 "testing" 8 - "time" 9 8 10 - "github.com/bluesky-social/indigo/backfill" 11 9 "github.com/ipfs/go-cid" 12 10 typegen "github.com/whyrusleeping/cbor-gen" 13 11 ) ··· 20 18 } 21 19 22 20 func TestBackfill(t *testing.T) { 21 + /* this test depends on being able to hit the live production bgs... 23 22 ctx := context.Background() 24 23 25 24 testRepos := []string{ ··· 28 27 "did:plc:t7y4sud4dhptvzz7ibnv5cbt", 29 28 } 30 29 31 - mem := backfill.NewMemstore() 30 + db, err := gorm.Open(sqlite.Open("sqlite://:memory")) 31 + if err != nil { 32 + t.Fatal(err) 33 + } 34 + 35 + store := backfill.NewGormstore(db) 32 36 ts := &testState{} 33 37 34 38 opts := backfill.DefaultBackfillOptions() ··· 37 41 38 42 bf := backfill.NewBackfiller( 39 43 "backfill-test", 40 - mem, 44 + store, 41 45 ts.handleCreate, 42 46 ts.handleUpdate, 43 47 ts.handleDelete, ··· 49 53 go bf.Start() 50 54 51 55 for _, repo := range testRepos { 52 - mem.EnqueueJob(repo) 56 + store.EnqueueJob(repo) 53 57 } 54 58 55 59 // Wait until job 0 is in progress 56 60 for { 57 - s, err := mem.GetJob(ctx, testRepos[0]) 61 + s, err := store.GetJob(ctx, testRepos[0]) 58 62 if err != nil { 59 63 t.Fatal(err) 60 64 } 61 65 if s.State() == backfill.StateInProgress { 62 - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef) 63 - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef) 64 - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef) 65 - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef) 66 - mem.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef) 66 + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef) 67 + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef) 68 + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef) 69 + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef) 70 + bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef) 67 71 68 - mem.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef) 72 + bf.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef) 69 73 70 - mem.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef) 74 + bf.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef) 71 75 72 76 break 73 77 } ··· 87 91 bf.Stop() 88 92 89 93 slog.Info("shutting down") 94 + */ 90 95 } 91 96 92 - func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { 97 + func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { 93 98 slog.Info("got create", "repo", repo, "path", path) 94 99 ts.lk.Lock() 95 100 ts.creates++ ··· 97 102 return nil 98 103 } 99 104 100 - func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error { 105 + func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { 101 106 slog.Info("got update", "repo", repo, "path", path) 102 107 ts.lk.Lock() 103 108 ts.updates++
+214 -73
backfill/gormstore.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "errors" 5 6 "fmt" 7 + "strings" 6 8 "sync" 7 9 "time" 8 10 ··· 12 14 ) 13 15 14 16 type Gormjob struct { 15 - repo string 16 - state string 17 + repo string 18 + state string 19 + rev string 20 + 17 21 lk sync.Mutex 18 - bufferedOps map[string][]*bufferedOp 22 + bufferedOps []*opSet 19 23 20 24 dbj *GormDBJob 21 25 db *gorm.DB 22 26 23 27 createdAt time.Time 24 28 updatedAt time.Time 29 + 30 + retryCount int 31 + retryAfter *time.Time 25 32 } 26 33 27 34 type GormDBJob struct { 28 35 gorm.Model 29 - Repo string `gorm:"unique;index"` 30 - State string `gorm:"index"` 36 + Repo string `gorm:"unique;index"` 37 + State string `gorm:"index"` 38 + Rev string 39 + RetryCount int 40 + RetryAfter *time.Time 31 41 } 32 42 33 43 // Gormstore is a gorm-backed implementation of the Backfill Store interface 34 44 type Gormstore struct { 35 45 lk sync.RWMutex 36 46 jobs map[string]*Gormjob 37 - db *gorm.DB 47 + 48 + qlk sync.Mutex 49 + taskQueue []string 50 + 51 + db *gorm.DB 38 52 } 39 53 40 54 func NewGormstore(db *gorm.DB) *Gormstore { ··· 45 59 } 46 60 47 61 func (s *Gormstore) LoadJobs(ctx context.Context) error { 48 - limit := 20_000 49 - offset := 0 50 - s.lk.Lock() 51 - defer s.lk.Unlock() 62 + s.qlk.Lock() 63 + defer s.qlk.Unlock() 64 + return s.loadJobs(ctx, 20_000) 65 + } 52 66 53 - for { 54 - var dbjobs []*GormDBJob 55 - // Load all jobs from the database 56 - if err := s.db.Limit(limit).Offset(offset).Find(&dbjobs).Error; err != nil { 57 - return err 58 - } 59 - if len(dbjobs) == 0 { 60 - break 61 - } 62 - offset += len(dbjobs) 67 + func (s *Gormstore) loadJobs(ctx context.Context, limit int) error { 68 + var todo []string 69 + if err := s.db.Model(GormDBJob{}).Limit(limit).Select("repo"). 70 + Where("state = 'enqueued' OR (state = 'failed' AND (retry_after = NULL OR retry_after < ?))", time.Now()).Scan(&todo).Error; err != nil { 71 + return err 72 + } 63 73 64 - // Convert them to in-memory jobs 65 - for i := range dbjobs { 66 - dbj := dbjobs[i] 67 - j := &Gormjob{ 68 - repo: dbj.Repo, 69 - state: dbj.State, 70 - bufferedOps: map[string][]*bufferedOp{}, 71 - createdAt: dbj.CreatedAt, 72 - updatedAt: dbj.UpdatedAt, 74 + s.taskQueue = append(s.taskQueue, todo...) 73 75 74 - dbj: dbj, 75 - db: s.db, 76 - } 77 - s.jobs[dbj.Repo] = j 78 - } 76 + return nil 77 + } 78 + 79 + func (s *Gormstore) GetOrCreateJob(ctx context.Context, repo, state string) (Job, error) { 80 + j, err := s.getJob(ctx, repo) 81 + if err == nil { 82 + return j, nil 79 83 } 80 84 85 + if !errors.Is(err, ErrJobNotFound) { 86 + return nil, err 87 + } 88 + 89 + if err := s.createJobForRepo(repo, state); err != nil { 90 + return nil, err 91 + } 92 + 93 + return s.getJob(ctx, repo) 94 + } 95 + 96 + func (s *Gormstore) EnqueueJob(ctx context.Context, repo string) error { 97 + _, err := s.GetOrCreateJob(ctx, repo, StateEnqueued) 98 + if err != nil { 99 + return err 100 + } 101 + 102 + s.qlk.Lock() 103 + s.taskQueue = append(s.taskQueue, repo) 104 + s.qlk.Unlock() 105 + 81 106 return nil 82 107 } 83 108 84 - func (s *Gormstore) EnqueueJob(repo string) error { 85 - // Persist the job to the database 109 + func (s *Gormstore) createJobForRepo(repo, state string) error { 86 110 dbj := &GormDBJob{ 87 111 Repo: repo, 88 112 State: StateEnqueued, ··· 104 128 } 105 129 106 130 j := &Gormjob{ 107 - repo: repo, 108 - createdAt: time.Now(), 109 - updatedAt: time.Now(), 110 - state: StateEnqueued, 111 - bufferedOps: map[string][]*bufferedOp{}, 131 + repo: repo, 132 + createdAt: time.Now(), 133 + updatedAt: time.Now(), 134 + state: state, 112 135 113 136 dbj: dbj, 114 137 db: s.db, ··· 118 141 return nil 119 142 } 120 143 121 - func (s *Gormstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { 122 - s.lk.RLock() 123 - 124 - // If the job doesn't exist, we can't buffer an op for it 125 - j, ok := s.jobs[repo] 126 - s.lk.RUnlock() 127 - if !ok { 128 - return false, ErrJobNotFound 129 - } 130 - 144 + func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) { 131 145 j.lk.Lock() 132 146 defer j.lk.Unlock() 133 147 134 148 switch j.state { 135 149 case StateComplete: 136 - return false, ErrJobComplete 137 - case StateInProgress: 138 - // keep going and buffer the op 150 + return false, nil 151 + case StateInProgress, StateEnqueued: 152 + // keep going and buffer the op 139 153 default: 140 - return false, nil 154 + return false, fmt.Errorf("invalid job state: %q", j.state) 141 155 } 142 156 143 - j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ 144 - kind: kind, 145 - rec: rec, 146 - cid: cid, 147 - }) 148 - j.updatedAt = time.Now() 157 + j.bufferOps(&opSet{since: since, rev: rev, ops: ops}) 149 158 return true, nil 150 159 } 151 160 161 + func (j *Gormjob) bufferOps(ops *opSet) { 162 + j.bufferedOps = append(j.bufferedOps, ops) 163 + j.updatedAt = time.Now() 164 + } 165 + 152 166 func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error) { 167 + return s.getJob(ctx, repo) 168 + } 169 + 170 + func (s *Gormstore) getJob(ctx context.Context, repo string) (*Gormjob, error) { 171 + cj := s.checkJobCache(ctx, repo) 172 + if cj != nil { 173 + return cj, nil 174 + } 175 + 176 + return s.loadJob(ctx, repo) 177 + } 178 + 179 + func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) { 180 + var dbj GormDBJob 181 + if err := s.db.Find(&dbj, "repo = ?", repo).Error; err != nil { 182 + return nil, err 183 + } 184 + 185 + if dbj.ID == 0 { 186 + return nil, ErrJobNotFound 187 + } 188 + 189 + j := &Gormjob{ 190 + repo: dbj.Repo, 191 + state: dbj.State, 192 + createdAt: dbj.CreatedAt, 193 + updatedAt: dbj.UpdatedAt, 194 + 195 + dbj: &dbj, 196 + db: s.db, 197 + 198 + retryCount: dbj.RetryCount, 199 + retryAfter: dbj.RetryAfter, 200 + } 201 + s.lk.Lock() 202 + defer s.lk.Unlock() 203 + // would imply a race condition 204 + exist, ok := s.jobs[repo] 205 + if ok { 206 + return exist, nil 207 + } 208 + s.jobs[repo] = j 209 + return j, nil 210 + } 211 + 212 + func (s *Gormstore) checkJobCache(ctx context.Context, repo string) *Gormjob { 153 213 s.lk.RLock() 154 214 defer s.lk.RUnlock() 155 215 156 216 j, ok := s.jobs[repo] 157 217 if !ok || j == nil { 158 - return nil, nil 218 + return nil 159 219 } 160 - return j, nil 220 + return j 161 221 } 162 222 163 223 func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { 164 - s.lk.RLock() 165 - defer s.lk.RUnlock() 224 + s.qlk.Lock() 225 + defer s.qlk.Unlock() 226 + if len(s.taskQueue) == 0 { 227 + if err := s.loadJobs(ctx, 1000); err != nil { 228 + return nil, err 229 + } 166 230 167 - for _, j := range s.jobs { 168 - if j.State() == StateEnqueued { 231 + if len(s.taskQueue) == 0 { 232 + return nil, nil 233 + } 234 + } 235 + 236 + for len(s.taskQueue) > 0 { 237 + first := s.taskQueue[0] 238 + s.taskQueue = s.taskQueue[1:] 239 + 240 + j, err := s.getJob(ctx, first) 241 + if err != nil { 242 + return nil, err 243 + } 244 + 245 + shouldRetry := strings.HasPrefix(j.State(), "failed") && j.retryAfter != nil && time.Now().After(*j.retryAfter) 246 + 247 + if j.State() == StateEnqueued || shouldRetry { 169 248 return j, nil 170 249 } 171 250 } ··· 183 262 return j.state 184 263 } 185 264 265 + func (j *Gormjob) SetRev(ctx context.Context, r string) error { 266 + j.lk.Lock() 267 + defer j.lk.Unlock() 268 + 269 + j.rev = r 270 + j.updatedAt = time.Now() 271 + 272 + // Persist the job to the database 273 + j.dbj.Rev = r 274 + return j.db.Save(j.dbj).Error 275 + } 276 + 277 + func (j *Gormjob) Rev() string { 278 + j.lk.Lock() 279 + defer j.lk.Unlock() 280 + 281 + return j.rev 282 + } 283 + 186 284 func (j *Gormjob) SetState(ctx context.Context, state string) error { 187 285 j.lk.Lock() 188 286 defer j.lk.Unlock() ··· 190 288 j.state = state 191 289 j.updatedAt = time.Now() 192 290 291 + if strings.HasPrefix(state, "failed") { 292 + if j.retryCount < MaxRetries { 293 + next := time.Now().Add(computeExponentialBackoff(j.retryCount)) 294 + j.retryAfter = &next 295 + j.retryCount++ 296 + } else { 297 + j.retryAfter = nil 298 + } 299 + } 300 + 193 301 // Persist the job to the database 194 302 j.dbj.State = state 195 303 return j.db.Save(j.dbj).Error 196 304 } 197 305 198 - func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { 306 + func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { 307 + // TODO: this will block any events for this repo while this flush is ongoing, is that okay? 199 308 j.lk.Lock() 200 309 defer j.lk.Unlock() 201 310 202 - for path, ops := range j.bufferedOps { 203 - for _, op := range ops { 204 - if err := fn(op.kind, path, op.rec, op.cid); err != nil { 311 + for _, opset := range j.bufferedOps { 312 + if opset.rev < j.rev { 313 + // stale events, skip 314 + continue 315 + } 316 + 317 + if opset.since == nil { 318 + // TODO: what does this mean? 319 + return fmt.Errorf("nil since in event after backfill: %w", ErrEventGap) 320 + } 321 + 322 + if j.rev != *opset.since { 323 + // we've got a discontinuity 324 + return fmt.Errorf("event since did not match current rev (%s != %s): %w", *opset.since, j.rev, ErrEventGap) 325 + } 326 + 327 + for _, op := range opset.ops { 328 + if err := fn(op.kind, op.path, op.rec, op.cid); err != nil { 205 329 return err 206 330 } 207 331 } 332 + 333 + j.rev = opset.rev 208 334 } 209 335 210 - j.bufferedOps = map[string][]*bufferedOp{} 336 + j.bufferedOps = []*opSet{} 211 337 j.state = StateComplete 212 338 213 339 return nil ··· 217 343 j.lk.Lock() 218 344 defer j.lk.Unlock() 219 345 220 - j.bufferedOps = map[string][]*bufferedOp{} 346 + j.bufferedOps = []*opSet{} 221 347 j.updatedAt = time.Now() 222 348 return nil 223 349 } 350 + 351 + func (j *Gormjob) RetryCount() int { 352 + j.lk.Lock() 353 + defer j.lk.Unlock() 354 + return j.retryCount 355 + } 356 + 357 + func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error { 358 + j, err := s.GetJob(ctx, repo) 359 + if err != nil { 360 + return err 361 + } 362 + 363 + return j.SetRev(ctx, rev) 364 + }
+76 -24
backfill/memstore.go
··· 12 12 13 13 type bufferedOp struct { 14 14 kind string 15 - rec *typegen.CBORMarshaler 15 + path string 16 + rec typegen.CBORMarshaler 16 17 cid *cid.Cid 17 18 } 18 19 20 + type opSet struct { 21 + since *string 22 + rev string 23 + ops []*bufferedOp 24 + } 25 + 19 26 type Memjob struct { 20 27 repo string 21 28 state string 29 + rev string 22 30 lk sync.Mutex 23 - bufferedOps map[string][]*bufferedOp 31 + bufferedOps []*opSet 24 32 25 33 createdAt time.Time 26 34 updatedAt time.Time ··· 47 55 } 48 56 49 57 j := &Memjob{ 50 - repo: repo, 51 - createdAt: time.Now(), 52 - updatedAt: time.Now(), 53 - state: StateEnqueued, 54 - bufferedOps: map[string][]*bufferedOp{}, 58 + repo: repo, 59 + createdAt: time.Now(), 60 + updatedAt: time.Now(), 61 + state: StateEnqueued, 55 62 } 56 63 s.jobs[repo] = j 57 64 return nil 58 65 } 59 66 60 - func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { 67 + func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) (bool, error) { 61 68 s.lk.Lock() 62 69 63 70 // If the job doesn't exist, we can't buffer an op for it ··· 79 86 return false, nil 80 87 } 81 88 82 - j.bufferedOps[path] = append(j.bufferedOps[path], &bufferedOp{ 83 - kind: kind, 84 - rec: rec, 85 - cid: cid, 89 + j.bufferedOps = append(j.bufferedOps, &opSet{ 90 + since: since, 91 + rev: rev, 92 + ops: []*bufferedOp{&bufferedOp{ 93 + path: path, 94 + kind: kind, 95 + rec: rec, 96 + cid: cid, 97 + }}, 98 + }) 99 + j.updatedAt = time.Now() 100 + return true, nil 101 + } 102 + 103 + func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) { 104 + j.lk.Lock() 105 + defer j.lk.Unlock() 106 + 107 + switch j.state { 108 + case StateComplete: 109 + return false, ErrJobComplete 110 + case StateInProgress: 111 + // keep going and buffer the op 112 + default: 113 + return false, nil 114 + } 115 + 116 + j.bufferedOps = append(j.bufferedOps, &opSet{ 117 + since: since, 118 + rev: rev, 119 + ops: ops, 86 120 }) 87 121 j.updatedAt = time.Now() 88 122 return true, nil ··· 131 165 return nil 132 166 } 133 167 134 - func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error { 135 - j.lk.Lock() 136 - defer j.lk.Unlock() 168 + func (j *Memjob) Rev() string { 169 + return j.rev 170 + } 171 + 172 + func (j *Memjob) SetRev(ctx context.Context, rev string) error { 173 + j.rev = rev 174 + return nil 175 + } 137 176 138 - for path, ops := range j.bufferedOps { 139 - for _, op := range ops { 140 - if err := fn(op.kind, path, op.rec, op.cid); err != nil { 141 - return err 177 + func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { 178 + panic("TODO: copy what we end up doing from the gormstore") 179 + /* 180 + j.lk.Lock() 181 + defer j.lk.Unlock() 182 + 183 + for _, opset := range j.bufferedOps { 184 + for _, op := range opset.ops { 185 + if err := fn(op.kind, op.path, op.rec, op.cid); err != nil { 186 + return err 187 + } 142 188 } 143 189 } 144 - } 145 190 146 - j.bufferedOps = map[string][]*bufferedOp{} 147 - j.state = StateComplete 191 + j.bufferedOps = map[string][]*bufferedOp{} 192 + j.state = StateComplete 148 193 149 - return nil 194 + return nil 195 + */ 150 196 } 151 197 152 198 func (j *Memjob) ClearBufferedOps(ctx context.Context) error { 153 199 j.lk.Lock() 154 200 defer j.lk.Unlock() 155 201 156 - j.bufferedOps = map[string][]*bufferedOp{} 202 + j.bufferedOps = []*opSet{} 157 203 j.updatedAt = time.Now() 158 204 return nil 159 205 } 206 + 207 + func (j *Memjob) RetryCount() int { 208 + j.lk.Lock() 209 + defer j.lk.Unlock() 210 + return 0 211 + }
+1 -1
bgs/bgs.go
··· 851 851 } 852 852 853 853 // skip the fast path for rebases or if the user is already in the slow path 854 - if bgs.Index.Crawler.RepoInSlowPath(ctx, host, u.ID) { 854 + if bgs.Index.Crawler.RepoInSlowPath(ctx, u.ID) { 855 855 rebasesCounter.WithLabelValues(host.Host).Add(1) 856 856 ai, err := bgs.Index.LookupUser(ctx, u.ID) 857 857 if err != nil {
+1
cmd/gosky/admin.go
··· 766 766 } 767 767 768 768 xrpcc.AdminToken = &adminKey 769 + 769 770 resp, err := comatproto.ServerCreateInviteCodes(context.TODO(), xrpcc, &comatproto.ServerCreateInviteCodes_Input{ 770 771 UseCount: int64(count), 771 772 ForAccounts: usrdid,
+1 -1
indexer/crawler.go
··· 256 256 } 257 257 } 258 258 259 - func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, host *models.PDS, uid models.Uid) bool { 259 + func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bool { 260 260 c.maplk.Lock() 261 261 defer c.maplk.Unlock() 262 262 if _, ok := c.todo[uid]; ok {
+11 -135
search/firehose.go
··· 15 15 "github.com/bluesky-social/indigo/backfill" 16 16 "github.com/bluesky-social/indigo/events" 17 17 "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 18 - lexutil "github.com/bluesky-social/indigo/lex/util" 19 18 "github.com/bluesky-social/indigo/repo" 20 - "github.com/bluesky-social/indigo/repomgr" 21 19 22 20 "github.com/carlmjohnson/versioninfo" 23 21 "github.com/gorilla/websocket" ··· 101 99 return nil 102 100 } 103 101 104 - // Check if we've backfilled this repo, if not, we should enqueue it 105 - job, err := s.bfs.GetJob(ctx, evt.Repo) 106 - if job == nil && err == nil { 107 - logEvt.Info("enqueueing backfill job for new repo") 108 - if err := s.bfs.EnqueueJob(evt.Repo); err != nil { 109 - logEvt.Warn("failed to enqueue backfill job", "err", err) 110 - } 111 - } 112 - 113 - r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 114 - if err != nil { 115 - // TODO: handle this case (instead of return nil) 116 - logEvt.Error("reading repo from car", "size_bytes", len(evt.Blocks), "err", err) 117 - return nil 118 - } 119 - 120 - for _, op := range evt.Ops { 121 - ek := repomgr.EventKind(op.Action) 122 - logOp := logEvt.With("op_path", op.Path, "op_cid", op.Cid) 123 - switch ek { 124 - case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 125 - rc, rec, err := r.GetRecord(ctx, op.Path) 126 - if err != nil { 127 - // TODO: handle this case (instead of return nil) 128 - logOp.Error("fetching record from event CAR slice", "err", err) 129 - return nil 130 - } 131 - 132 - if lexutil.LexLink(rc) != *op.Cid { 133 - // TODO: handle this case (instead of return nil) 134 - logOp.Error("mismatch in record and op cid", "record_cid", rc) 135 - return nil 136 - } 137 - 138 - if strings.HasPrefix(op.Path, "app.bsky.feed.post") { 139 - postsReceived.Inc() 140 - } else if strings.HasPrefix(op.Path, "app.bsky.actor.profile") { 141 - profilesReceived.Inc() 142 - } 143 - 144 - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil { 145 - // TODO: handle this case (instead of return nil) 146 - logOp.Error("failed to handle event op", "err", err) 147 - return nil 148 - } 149 - 150 - case repomgr.EvtKindDeleteRecord: 151 - if err := s.handleOp(ctx, ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil { 152 - // TODO: handle this case (instead of return nil) 153 - logOp.Error("failed to handle delete", "err", err) 154 - return nil 155 - } 156 - } 102 + // Pass events to the backfiller which will process or buffer as needed 103 + if err := s.bf.HandleEvent(ctx, evt); err != nil { 104 + logEvt.Error("failed to handle event", "err", err) 157 105 } 158 106 159 107 return nil ··· 194 142 cursor := "" 195 143 limit := int64(500) 196 144 197 - totalEnqueued := 0 198 - totalSkipped := 0 145 + total := 0 199 146 totalErrored := 0 200 147 201 148 for { ··· 206 153 continue 207 154 } 208 155 log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor) 209 - enqueued := 0 210 - skipped := 0 211 156 errored := 0 212 157 for _, repo := range resp.Repos { 213 - job, err := s.bfs.GetJob(ctx, repo.Did) 214 - if job == nil && err == nil { 215 - log.Info("enqueuing backfill job for new repo", "did", repo.Did) 216 - if err := s.bfs.EnqueueJob(repo.Did); err != nil { 217 - log.Warn("failed to enqueue backfill job", "err", err) 218 - errored++ 219 - continue 220 - } 221 - enqueued++ 222 - } else if err != nil { 223 - log.Warn("failed to get backfill job", "did", repo.Did, "err", err) 158 + _, err := s.bfs.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued) 159 + if err != nil { 160 + log.Error("failed to get or create job", "did", repo.Did, "err", err) 224 161 errored++ 225 - } else { 226 - skipped++ 227 162 } 228 163 } 229 - log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored) 230 - totalEnqueued += enqueued 231 - totalSkipped += skipped 164 + log.Info("enqueued repos", "total", len(resp.Repos), "errored", errored) 232 165 totalErrored += errored 166 + total += len(resp.Repos) 233 167 if resp.Cursor != nil && *resp.Cursor != "" { 234 168 cursor = *resp.Cursor 235 169 } else { ··· 237 171 } 238 172 } 239 173 240 - log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored) 174 + log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored) 241 175 } 242 176 243 - func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { 177 + func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error { 244 178 // Since this gets called in a backfill job, we need to check if the path is a post or profile 245 179 if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { 246 180 return nil ··· 258 192 if ident == nil { 259 193 return fmt.Errorf("identity not found for did: %s", did.String()) 260 194 } 261 - rec := *recP 262 195 263 196 switch rec := rec.(type) { 264 197 case *bsky.FeedPost: ··· 306 239 postsDeleted.Inc() 307 240 case strings.Contains(path, "app.bsky.actor.profile"): 308 241 // profilesDeleted.Inc() 309 - } 310 - 311 - return nil 312 - } 313 - 314 - func (s *Server) handleOp(ctx context.Context, op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec typegen.CBORMarshaler) error { 315 - var err error 316 - if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { 317 - return nil 318 - } 319 - 320 - if op == repomgr.EvtKindCreateRecord || op == repomgr.EvtKindUpdateRecord { 321 - s.logger.Debug("processing create record op", "seq", seq, "did", did, "path", path) 322 - 323 - // Try to buffer the op, if it fails, we need to create a backfill job 324 - _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) 325 - if err == backfill.ErrJobNotFound { 326 - s.logger.Debug("no backfill job found for repo, creating one", "did", did) 327 - 328 - if err := s.bfs.EnqueueJob(did); err != nil { 329 - return fmt.Errorf("enqueueing backfill job: %w", err) 330 - } 331 - 332 - // Try to buffer the op again so it gets picked up by the backfill job 333 - _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) 334 - if err != nil { 335 - return fmt.Errorf("buffering backfill op: %w", err) 336 - } 337 - } else if err == backfill.ErrJobComplete { 338 - // Backfill is done for this repo so we can just index it now 339 - err = s.handleCreateOrUpdate(ctx, did, path, &rec, rcid) 340 - } 341 - } else if op == repomgr.EvtKindDeleteRecord { 342 - s.logger.Debug("processing delete record op", "seq", seq, "did", did, "path", path) 343 - 344 - // Try to buffer the op, if it fails, we need to create a backfill job 345 - _, err := s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) 346 - if err == backfill.ErrJobNotFound { 347 - s.logger.Debug("no backfill job found for repo, creating one", "did", did) 348 - 349 - if err := s.bfs.EnqueueJob(did); err != nil { 350 - return fmt.Errorf("enqueueing backfill job: %w", err) 351 - } 352 - 353 - // Try to buffer the op again so it gets picked up by the backfill job 354 - _, err = s.bfs.BufferOp(ctx, did, string(op), path, &rec, rcid) 355 - if err != nil { 356 - return fmt.Errorf("buffering backfill op: %w", err) 357 - } 358 - } else if err == backfill.ErrJobComplete { 359 - // Backfill is done for this repo so we can delete imemdiately 360 - err = s.handleDelete(ctx, did, path) 361 - } 362 - } 363 - 364 - if err != nil { 365 - return fmt.Errorf("failed to handle op: %w", err) 366 242 } 367 243 368 244 return nil
+1 -1
search/handlers.go
··· 171 171 for _, did := range dids { 172 172 job, err := s.bfs.GetJob(ctx, did) 173 173 if job == nil && err == nil { 174 - err := s.bfs.EnqueueJob(did) 174 + err := s.bfs.EnqueueJob(ctx, did) 175 175 if err != nil { 176 176 errs = append(errs, IndexError{ 177 177 DID: did,