this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Pass rev to operation handlers in backfill library (#488)

authored by

Jaz and committed by
GitHub
c0558132 ad730a7d

+22 -20
+17 -15
backfill/backfill.go
··· 37 37 // Once done it clears the buffer and marks the job as "complete" 38 38 // Allowing the Job interface to abstract away the details of how buffered 39 39 // operations are stored and/or locked 40 - FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error 40 + FlushBufferedOps(ctx context.Context, cb func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error 41 41 42 42 ClearBufferedOps(ctx context.Context) error 43 43 } ··· 56 56 // Backfiller is a struct which handles backfilling a repo 57 57 type Backfiller struct { 58 58 Name string 59 - HandleCreateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error 60 - HandleUpdateRecord func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error 61 - HandleDeleteRecord func(ctx context.Context, repo string, path string) error 59 + HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error 60 + HandleUpdateRecord func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error 61 + HandleDeleteRecord func(ctx context.Context, repo string, rev string, path string) error 62 62 Store Store 63 63 64 64 // Number of backfills to process in parallel ··· 120 120 func NewBackfiller( 121 121 name string, 122 122 store Store, 123 - handleCreate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, 124 - handleUpdate func(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, 125 - handleDelete func(ctx context.Context, repo string, path string) error, 123 + handleCreate func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, 124 + handleUpdate func(ctx context.Context, repo string, rev string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error, 125 + handleDelete func(ctx context.Context, repo string, rev string, path string) error, 126 126 opts *BackfillOptions, 127 127 ) *Backfiller { 128 128 if opts == nil { ··· 210 210 211 211 // Flush buffered operations, clear the buffer, and mark the job as "complete" 212 212 // Clearning and marking are handled by the job interface 213 - err := job.FlushBufferedOps(ctx, func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { 213 + err := job.FlushBufferedOps(ctx, func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { 214 214 switch repomgr.EventKind(kind) { 215 215 case repomgr.EvtKindCreateRecord: 216 - err := b.HandleCreateRecord(ctx, repo, path, rec, cid) 216 + err := b.HandleCreateRecord(ctx, repo, rev, path, rec, cid) 217 217 if err != nil { 218 218 log.Error("failed to handle create record", "error", err) 219 219 } 220 220 case repomgr.EvtKindUpdateRecord: 221 - err := b.HandleUpdateRecord(ctx, repo, path, rec, cid) 221 + err := b.HandleUpdateRecord(ctx, repo, rev, path, rec, cid) 222 222 if err != nil { 223 223 log.Error("failed to handle update record", "error", err) 224 224 } 225 225 case repomgr.EvtKindDeleteRecord: 226 - err := b.HandleDeleteRecord(ctx, repo, path) 226 + err := b.HandleDeleteRecord(ctx, repo, rev, path) 227 227 if err != nil { 228 228 log.Error("failed to handle delete record", "error", err) 229 229 } ··· 375 375 log.Error("failed to iterated records in repo", "err", err) 376 376 } 377 377 }() 378 + 379 + rev := r.SignedCommit().Rev 378 380 379 381 // Consumer routines 380 382 for i := 0; i < numRoutines; i++ { ··· 399 401 continue 400 402 } 401 403 402 - err = b.HandleCreateRecord(ctx, repoDid, item.recordPath, recM, &item.nodeCid) 404 + err = b.HandleCreateRecord(ctx, repoDid, rev, item.recordPath, recM, &item.nodeCid) 403 405 if err != nil { 404 406 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} 405 407 continue ··· 509 511 for _, op := range ops { 510 512 switch op.kind { 511 513 case "create": 512 - if err := bf.HandleCreateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { 514 + if err := bf.HandleCreateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil { 513 515 return fmt.Errorf("create record failed: %w", err) 514 516 } 515 517 case "update": 516 - if err := bf.HandleUpdateRecord(ctx, evt.Repo, op.path, op.rec, op.cid); err != nil { 518 + if err := bf.HandleUpdateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil { 517 519 return fmt.Errorf("update record failed: %w", err) 518 520 } 519 521 case "delete": 520 - if err := bf.HandleDeleteRecord(ctx, evt.Repo, op.path); err != nil { 522 + if err := bf.HandleDeleteRecord(ctx, evt.Repo, evt.Rev, op.path); err != nil { 521 523 return fmt.Errorf("delete record failed: %w", err) 522 524 } 523 525 }
+2 -2
backfill/gormstore.go
··· 303 303 return j.db.Save(j.dbj).Error 304 304 } 305 305 306 - func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { 306 + func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { 307 307 // TODO: this will block any events for this repo while this flush is ongoing, is that okay? 308 308 j.lk.Lock() 309 309 defer j.lk.Unlock() ··· 325 325 } 326 326 327 327 for _, op := range opset.ops { 328 - if err := fn(op.kind, op.path, op.rec, op.cid); err != nil { 328 + if err := fn(op.kind, opset.rev, op.path, op.rec, op.cid); err != nil { 329 329 return err 330 330 } 331 331 }
+1 -1
backfill/memstore.go
··· 174 174 return nil 175 175 } 176 176 177 - func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { 177 + func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error) error { 178 178 panic("TODO: copy what we end up doing from the gormstore") 179 179 /* 180 180 j.lk.Lock()
+2 -2
search/firehose.go
··· 174 174 log.Info("finished repo discovery", "totalJobs", total, "totalErrored", totalErrored) 175 175 } 176 176 177 - func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error { 177 + func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, rev string, path string, rec typegen.CBORMarshaler, rcid *cid.Cid) error { 178 178 // Since this gets called in a backfill job, we need to check if the path is a post or profile 179 179 if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { 180 180 return nil ··· 211 211 return nil 212 212 } 213 213 214 - func (s *Server) handleDelete(ctx context.Context, rawDID, path string) error { 214 + func (s *Server) handleDelete(ctx context.Context, rawDID, rev, path string) error { 215 215 // Since this gets called in a backfill job, we need to check if the path is a post or profile 216 216 if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { 217 217 return nil