this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix cleanup of rebases in carstore (#222)

This fixes two things, first the test-pds didnt properly implement
rebases which caused some issues in tests.
The second thing was that old blockRefs werent getting deleted after the
rebase, despite their associated CarShards getting deleted.

authored by

Whyrusleeping and committed by
GitHub
47aaeaab 5e72cf75

+212 -44
+8
bgs/bgs.go
··· 711 711 return err 712 712 } 713 713 714 + // TODO: we currently do not handle events that get queued up 715 + // behind an already 'in progress' slow path event. 716 + // this is strictly less efficient than it could be, and while it 717 + // does 'work' (due to falling back to resyncing the repo), its 718 + // technically incorrect. Now that we have the parallel event 719 + // processor coming off of the pds stream, we should investigate 720 + // whether or not we even need this 'slow path' logic, as it makes 721 + // accounting for which events have been processed much harder 714 722 return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt) 715 723 } 716 724
+4
carstore/bs.go
··· 743 743 if err := ds.cs.meta.Delete(&sl).Error; err != nil { 744 744 return err 745 745 } 746 + 747 + if err := ds.cs.meta.Where("shard = ?", sl.ID).Delete(&blockRef{}).Error; err != nil { 748 + return err 749 + } 746 750 } 747 751 748 752 return nil
+39 -31
indexer/crawler.go
··· 16 16 17 17 repoSync chan *crawlWork 18 18 19 - catchup chan *catchupJob 19 + catchup chan *crawlWork 20 20 21 21 complete chan models.Uid 22 22 ··· 38 38 ingest: make(chan *models.ActorInfo), 39 39 repoSync: make(chan *crawlWork), 40 40 complete: make(chan models.Uid), 41 - catchup: make(chan *catchupJob), 41 + catchup: make(chan *crawlWork), 42 42 doRepoCrawl: repoFn, 43 43 concurrency: concurrency, 44 44 todo: make(map[models.Uid]*crawlWork), ··· 121 121 next = nil 122 122 rs = nil 123 123 } 124 - case catchup := <-c.catchup: 125 - c.maplk.Lock() 126 - job, ok := c.todo[catchup.user.Uid] 127 - // TODO: in the event of receiving a rebase event, we *could* pre-empt all other pending events 128 - if ok { 129 - job.catchup = append(job.catchup, catchup) 130 - c.maplk.Unlock() 131 - break 132 - } 133 - 134 - job, ok = c.inProgress[catchup.user.Uid] 135 - if ok { 136 - job.next = append(job.next, catchup) 137 - c.maplk.Unlock() 138 - break 139 - } 140 - 141 - cw := &crawlWork{ 142 - act: catchup.user, 143 - catchup: []*catchupJob{catchup}, 144 - } 145 - c.todo[catchup.user.Uid] = cw 146 - c.maplk.Unlock() 147 - 124 + case cw := <-c.catchup: 148 125 if next == nil { 149 126 next = cw 150 127 rs = c.repoSync ··· 179 156 } 180 157 } 181 158 159 + func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork { 160 + c.maplk.Lock() 161 + defer c.maplk.Unlock() 162 + job, ok := c.todo[catchup.user.Uid] 163 + // TODO: in the event of receiving a rebase event, we *could* pre-empt all other pending events 164 + if ok { 165 + job.catchup = append(job.catchup, catchup) 166 + return nil 167 + } 168 + 169 + job, ok = c.inProgress[catchup.user.Uid] 170 + if ok { 171 + job.next = append(job.next, catchup) 172 + return nil 173 + } 174 + 175 + cw := &crawlWork{ 176 + act: catchup.user, 177 + catchup: []*catchupJob{catchup}, 178 + } 179 + c.todo[catchup.user.Uid] = cw 180 + return cw 181 + } 182 + 182 183 func (c *CrawlDispatcher) fetchWorker() { 183 184 for { 184 185 select { ··· 195 196 196 197 func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error { 197 198 if ai.PDS == 0 { 198 - panic("not today!") 199 + panic("must have pds for user in queue") 199 200 } 200 201 201 202 ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler") ··· 211 212 212 213 func (c *CrawlDispatcher) AddToCatchupQueue(ctx context.Context, host *models.PDS, u *models.ActorInfo, evt *comatproto.SyncSubscribeRepos_Commit) error { 213 214 if u.PDS == 0 { 214 - panic("not okay") 215 + panic("must have pds for user in queue") 215 216 } 216 217 217 - select { 218 - case c.catchup <- &catchupJob{ 218 + catchup := &catchupJob{ 219 219 evt: evt, 220 220 host: host, 221 221 user: u, 222 - }: 222 + } 223 + 224 + cw := c.addToCatchupQueue(catchup) 225 + if cw == nil { 226 + return nil 227 + } 228 + 229 + select { 230 + case c.catchup <- cw: 223 231 return nil 224 232 case <-ctx.Done(): 225 233 return ctx.Err()
+29 -11
indexer/indexer.go
··· 838 838 } 839 839 840 840 var rebase *comatproto.SyncSubscribeRepos_Commit 841 - for _, job := range job.catchup { 842 - if job.evt.Rebase { 843 - rebase = job.evt 841 + var rebaseIx int 842 + for i, j := range job.catchup { 843 + if j.evt.Rebase { 844 + rebase = j.evt 845 + rebaseIx = i 844 846 break 845 847 } 846 848 } 847 - if rebase == nil { 848 - for _, job := range job.next { 849 - if job.evt.Rebase { 850 - rebase = job.evt 851 - break 852 - } 853 - } 854 - } 855 849 856 850 if rebase != nil { 857 851 if err := ix.repomgr.HandleRebase(ctx, ai.PDS, ai.Uid, ai.Did, (*cid.Cid)(rebase.Prev), (cid.Cid)(rebase.Commit), rebase.Blocks); err != nil { 858 852 return fmt.Errorf("handling rebase: %w", err) 859 853 } 854 + // now process the rest of the catchup events 855 + // these are all events that got received *after* the rebase, but 856 + // before we could start processing it. 857 + // That means these should be the next operations that get cleanly 858 + // applied after the rebase 859 + for _, j := range job.catchup[rebaseIx+1:] { 860 + if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, (*cid.Cid)(j.evt.Prev), j.evt.Blocks, j.evt.Ops); err != nil { 861 + return fmt.Errorf("post rebase catchup failed: %w", err) 862 + } 863 + } 860 864 return nil 865 + } 866 + 867 + if !(job.initScrape || len(job.catchup) == 0) { 868 + first := job.catchup[0] 869 + if first.evt.Prev == nil || curHead == (cid.Cid)(*first.evt.Prev) { 870 + for _, j := range job.catchup { 871 + if err := ix.repomgr.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, (*cid.Cid)(j.evt.Prev), j.evt.Blocks, j.evt.Ops); err != nil { 872 + // TODO: if we fail here, we should probably fall back to a repo re-sync 873 + return fmt.Errorf("post rebase catchup failed: %w", err) 874 + } 875 + } 876 + 877 + return nil 878 + } 861 879 } 862 880 863 881 var host string
+48
repomgr/ingest_test.go
··· 180 180 181 181 return slice, root, tid 182 182 } 183 + 184 + func TestRebase(t *testing.T) { 185 + dir, err := os.MkdirTemp("", "integtest") 186 + if err != nil { 187 + t.Fatal(err) 188 + } 189 + 190 + maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite"))) 191 + if err != nil { 192 + t.Fatal(err) 193 + } 194 + maindb.AutoMigrate(models.ActorInfo{}) 195 + 196 + did := "did:plc:beepboop" 197 + maindb.Create(&models.ActorInfo{ 198 + Did: did, 199 + Uid: 1, 200 + }) 201 + 202 + cs := testCarstore(t, dir) 203 + 204 + repoman := NewRepoManager(cs, &util.FakeKeyManager{}) 205 + 206 + ctx := context.TODO() 207 + if err := repoman.InitNewActor(ctx, 1, "hello.world", "did:plc:foobar", "", "", ""); err != nil { 208 + t.Fatal(err) 209 + } 210 + 211 + for i := 0; i < 5; i++ { 212 + _, _, err := repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 213 + Text: fmt.Sprintf("hello friend %d", i), 214 + }) 215 + if err != nil { 216 + t.Fatal(err) 217 + } 218 + } 219 + 220 + if err := repoman.DoRebase(ctx, 1); err != nil { 221 + t.Fatal(err) 222 + } 223 + 224 + _, _, err = repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 225 + Text: "after the rebase", 226 + }) 227 + if err != nil { 228 + t.Fatal(err) 229 + } 230 + }
+5
repomgr/repomgr.go
··· 554 554 return err 555 555 } 556 556 557 + if err := r.CopyDataTo(ctx, ds); err != nil { 558 + return err 559 + } 560 + 557 561 if err := ds.CloseAsRebase(ctx, nroot); err != nil { 558 562 return fmt.Errorf("finalizing rebase: %w", err) 559 563 } ··· 563 567 if _, err := carstore.WriteCarHeader(buf, nroot); err != nil { 564 568 return err 565 569 } 570 + 566 571 robj, err := ds.Get(ctx, nroot) 567 572 if err != nil { 568 573 return err
+78 -1
testing/integ_test.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "encoding/json" 6 7 "fmt" 7 8 "math/rand" 8 9 "strings" ··· 19 20 ) 20 21 21 22 func init() { 22 - log.SetAllLoggers(log.LevelDebug) 23 + log.SetAllLoggers(log.LevelInfo) 23 24 } 24 25 25 26 func TestBGSBasic(t *testing.T) { ··· 380 381 evts2 := b1.Events(t, 0) 381 382 afterEvts := evts2.WaitFor(1) 382 383 assert.Equal(true, afterEvts[0].RepoCommit.Rebase) 384 + } 385 + 386 + func TestRebaseMulti(t *testing.T) { 387 + if testing.Short() { 388 + t.Skip("skipping BGS test in 'short' test mode") 389 + } 390 + assert := assert.New(t) 391 + didr := TestPLC(t) 392 + p1 := MustSetupPDS(t, ".tpds", didr) 393 + p1.Run(t) 394 + 395 + b1 := MustSetupBGS(t, didr) 396 + b1.Run(t) 397 + 398 + b1.tr.TrialHosts = []string{p1.RawHost()} 399 + 400 + p1.RequestScraping(t, b1) 401 + 402 + esgenesis := b1.Events(t, 0) 403 + 404 + time.Sleep(time.Millisecond * 50) 405 + 406 + bob := p1.MustNewUser(t, "bob.tpds") 407 + 408 + for i := 0; i < 10; i++ { 409 + bob.Post(t, fmt.Sprintf("this is bobs post %d", i)) 410 + } 411 + 412 + // wait for 11 events, the first one is the actor creation 413 + firsten := esgenesis.WaitFor(11) 414 + _ = firsten 415 + 416 + fmt.Println("REBASE ONE") 417 + bob.DoRebase(t) 418 + 419 + var posts []*atproto.RepoStrongRef 420 + for i := 0; i < 10; i++ { 421 + ref := bob.Post(t, fmt.Sprintf("this is bobs post after rebase %d", i)) 422 + posts = append(posts, ref) 423 + } 424 + 425 + time.Sleep(time.Millisecond * 50) 426 + 427 + evts1 := b1.Events(t, 0) 428 + defer evts1.Cancel() 429 + 430 + all := evts1.WaitFor(11) 431 + 432 + assert.Equal(true, all[0].RepoCommit.Rebase) 433 + assert.Equal(int64(12), all[0].RepoCommit.Seq) 434 + assert.Equal(posts[0].Cid, all[1].RepoCommit.Ops[0].Cid.String()) 435 + 436 + // and another one! 437 + fmt.Println("REBASE TWO") 438 + bob.DoRebase(t) 439 + 440 + var posts2 []*atproto.RepoStrongRef 441 + for i := 0; i < 15; i++ { 442 + ref := bob.Post(t, fmt.Sprintf("this is bobs post after second rebase %d", i)) 443 + posts2 = append(posts2, ref) 444 + } 445 + 446 + time.Sleep(time.Millisecond * 50) 447 + 448 + evts2 := b1.Events(t, 0) 449 + defer evts2.Cancel() 450 + 451 + all = evts2.WaitFor(16) 452 + 453 + assert.Equal(true, all[0].RepoCommit.Rebase) 454 + assert.Equal(posts2[0].Cid, all[1].RepoCommit.Ops[0].Cid.String()) 455 + } 456 + 457 + func jsonPrint(v any) { 458 + b, _ := json.Marshal(v) 459 + fmt.Println(string(b)) 383 460 } 384 461 385 462 func commitFromSlice(t *testing.T, slice []byte, rcid cid.Cid) *repo.SignedCommit {
+1 -1
testing/utils.go
··· 527 527 go func() { 528 528 rsc := &events.RepoStreamCallbacks{ 529 529 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 530 + fmt.Println("received event: ", evt.Seq, evt.Repo, len(es.Events)) 530 531 es.Lk.Lock() 531 532 es.Events = append(es.Events, &events.XRPCStreamEvent{RepoCommit: evt}) 532 - fmt.Println("received event: ", evt.Seq, evt.Repo, len(es.Events)) 533 533 es.Lk.Unlock() 534 534 return nil 535 535 },