this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Remove headstore since new index makes it cheap to query the cardb (#249)

Updates to the headstore have become a new bottleneck and since we have
an index on `usr, seq DESC` in the CarDB, we can get a repo head
trivially and don't need the headstore anymore.

![CleanShot 2023-07-25 at 15 24
13](https://github.com/bluesky-social/indigo/assets/1617325/22c38da8-a6d5-4c88-9367-ca1839695d13)

authored by

Jaz and committed by
GitHub
e0ab2696 b734acd1

+24 -217
+1 -1
cmd/bigsky/main.go
··· 237 237 238 238 kmgr := indexer.NewKeyManager(cachedidr, nil) 239 239 240 - repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cstore, kmgr) 240 + repoman := repomgr.NewRepoManager(cstore, kmgr) 241 241 242 242 var persister events.EventPersistence 243 243
+1 -3
cmd/supercollider/main.go
··· 332 332 return nil, nil, err 333 333 } 334 334 335 - hs := repomgr.NewMemHeadStore() 336 - 337 335 mr := did.NewMultiResolver() 338 336 mr.AddHandler("web", &did.WebResolver{ 339 337 Insecure: true, ··· 348 346 349 347 kmgr := indexer.NewKeyManager(cachedidr, key) 350 348 351 - repoman := repomgr.NewRepoManager(hs, cs, kmgr) 349 + repoman := repomgr.NewRepoManager(cs, kmgr) 352 350 353 351 return repoman, key, nil 354 352 }
+2 -2
events/dbpersist_test.go
··· 43 43 Did: "did:example:123", 44 44 }) 45 45 46 - mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 46 + mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 47 47 48 48 err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 49 49 if err != nil { ··· 165 165 Did: "did:example:123", 166 166 }) 167 167 168 - mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 168 + mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 169 169 170 170 err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 171 171 if err != nil {
+4 -4
events/diskpersist_test.go
··· 39 39 Did: "did:example:123", 40 40 }) 41 41 42 - mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 42 + mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 43 43 44 44 err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 45 45 if err != nil { ··· 155 155 Did: "did:example:123", 156 156 }) 157 157 158 - mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 158 + mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 159 159 160 160 err := mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 161 161 if err != nil { ··· 270 270 Did: "did:example:123", 271 271 }) 272 272 273 - mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 273 + mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 274 274 275 275 err := mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 276 276 if err != nil { ··· 369 369 db.AutoMigrate(&pds.Peering{}) 370 370 db.AutoMigrate(&models.ActorInfo{}) 371 371 372 - mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 372 + mgr := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 373 373 374 374 // Create multiple users 375 375 userCount := 10
+1 -1
indexer/posts_test.go
··· 55 55 t.Fatal(err) 56 56 } 57 57 58 - repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(maindb), cs, &util.FakeKeyManager{}) 58 + repoman := repomgr.NewRepoManager(cs, &util.FakeKeyManager{}) 59 59 notifman := notifs.NewNotificationManager(maindb, repoman.GetRecord) 60 60 evtman := events.NewEventManager(events.NewMemPersister()) 61 61
+1 -1
labeler/service.go
··· 74 74 didr := &api.PLCServer{Host: plcURL} 75 75 kmgr := indexer.NewKeyManager(didr, repoUser.SigningKey) 76 76 evtmgr := events.NewEventManager(events.NewMemPersister()) 77 - repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, kmgr) 77 + repoman := repomgr.NewRepoManager(cs, kmgr) 78 78 79 79 if repoUser.Password == "" || repoUser.Did == "" || repoUser.Handle == "" { 80 80 return nil, fmt.Errorf("bad labeler repo config (empty string)")
+1 -2
pds/server.go
··· 76 76 77 77 kmgr := indexer.NewKeyManager(didr, serkey) 78 78 79 - hs := repomgr.NewDbHeadStore(db) 80 - repoman := repomgr.NewRepoManager(hs, cs, kmgr) 79 + repoman := repomgr.NewRepoManager(cs, kmgr) 81 80 notifman := notifs.NewNotificationManager(db, repoman.GetRecord) 82 81 83 82 ix, err := indexer.NewIndexer(db, notifman, evtman, didr, repoman, false, true)
+1 -2
repomgr/bench_test.go
··· 58 58 b.Fatal(err) 59 59 } 60 60 61 - hs := NewMemHeadStore() 62 - repoman := NewRepoManager(hs, cs, &util.FakeKeyManager{}) 61 + repoman := NewRepoManager(cs, &util.FakeKeyManager{}) 63 62 64 63 ctx := context.TODO() 65 64 if err := repoman.InitNewActor(ctx, 1, "hello.world", "did:foo:bar", "catdog", "", ""); err != nil {
-71
repomgr/dbheadstore.go
··· 1 - package repomgr 2 - 3 - import ( 4 - "context" 5 - 6 - "github.com/bluesky-social/indigo/models" 7 - 8 - "github.com/ipfs/go-cid" 9 - "go.opentelemetry.io/otel" 10 - "gorm.io/gorm" 11 - "gorm.io/gorm/clause" 12 - ) 13 - 14 - func NewDbHeadStore(db *gorm.DB) *DbHeadStore { 15 - db.AutoMigrate(RepoHead{}) 16 - 17 - return &DbHeadStore{db} 18 - } 19 - 20 - type DbHeadStore struct { 21 - db *gorm.DB 22 - } 23 - 24 - func (hs *DbHeadStore) InitUser(ctx context.Context, user models.Uid, root cid.Cid) error { 25 - if err := hs.db.WithContext(ctx).Create(&RepoHead{ 26 - Usr: user, 27 - Root: root.String(), 28 - }).Error; err != nil { 29 - return err 30 - } 31 - 32 - return nil 33 - } 34 - 35 - func (hs *DbHeadStore) UpdateUserRepoHead(ctx context.Context, user models.Uid, root cid.Cid) error { 36 - ctx, span := otel.Tracer("repoman").Start(ctx, "UpdateUserRepoHead") 37 - defer span.End() 38 - 39 - if err := hs.db.WithContext(ctx).Clauses(clause.OnConflict{ 40 - Columns: []clause.Column{{Name: "usr"}}, 41 - DoUpdates: clause.AssignmentColumns([]string{"root"}), 42 - }).Create(&RepoHead{ 43 - Usr: user, 44 - Root: root.String(), 45 - }).Error; err != nil { 46 - return err 47 - } 48 - 49 - return nil 50 - } 51 - 52 - func (hs *DbHeadStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 53 - ctx, span := otel.Tracer("repoman").Start(ctx, "GetUserRepoHead") 54 - defer span.End() 55 - 56 - var headrec RepoHead 57 - if err := hs.db.WithContext(ctx).Find(&headrec, "usr = ?", user).Error; err != nil { 58 - return cid.Undef, err 59 - } 60 - 61 - if headrec.ID == 0 { 62 - return cid.Undef, gorm.ErrRecordNotFound 63 - } 64 - 65 - cc, err := cid.Decode(headrec.Root) 66 - if err != nil { 67 - return cid.Undef, err 68 - } 69 - 70 - return cc, nil 71 - }
+2 -9
repomgr/ingest_test.go
··· 39 39 t.Fatal(err) 40 40 } 41 41 42 - maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite"))) 43 - if err != nil { 44 - t.Fatal(err) 45 - } 46 - 47 42 cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite"))) 48 43 if err != nil { 49 44 t.Fatal(err) ··· 59 54 t.Fatal(err) 60 55 } 61 56 62 - hs := NewDbHeadStore(maindb) 63 - repoman := NewRepoManager(hs, cs, &util.FakeKeyManager{}) 57 + repoman := NewRepoManager(cs, &util.FakeKeyManager{}) 64 58 65 59 fi, err := os.Open("../testing/testdata/divy.repo") 66 60 if err != nil { ··· 113 107 114 108 cs := testCarstore(t, dir) 115 109 116 - hs := NewDbHeadStore(maindb) 117 - repoman := NewRepoManager(hs, cs, &util.FakeKeyManager{}) 110 + repoman := NewRepoManager(cs, &util.FakeKeyManager{}) 118 111 119 112 dir2, err := os.MkdirTemp("", "integtest") 120 113 if err != nil {
-57
repomgr/memheadstore.go
··· 1 - package repomgr 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "sync" 7 - 8 - "github.com/bluesky-social/indigo/models" 9 - 10 - "github.com/ipfs/go-cid" 11 - ) 12 - 13 - type MemHeadStore struct { 14 - heads map[models.Uid]cid.Cid 15 - lk sync.RWMutex 16 - } 17 - 18 - func NewMemHeadStore() *MemHeadStore { 19 - return &MemHeadStore{ 20 - heads: make(map[models.Uid]cid.Cid), 21 - } 22 - } 23 - 24 - func (hs *MemHeadStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 25 - hs.lk.RLock() 26 - defer hs.lk.RUnlock() 27 - h, ok := hs.heads[user] 28 - if !ok { 29 - return cid.Undef, fmt.Errorf("user head not found") 30 - } 31 - 32 - return h, nil 33 - } 34 - 35 - func (hs *MemHeadStore) UpdateUserRepoHead(ctx context.Context, user models.Uid, root cid.Cid) error { 36 - hs.lk.Lock() 37 - defer hs.lk.Unlock() 38 - _, ok := hs.heads[user] 39 - if !ok { 40 - return fmt.Errorf("cannot update user head if it doesnt exist already") 41 - } 42 - 43 - hs.heads[user] = root 44 - return nil 45 - } 46 - 47 - func (hs *MemHeadStore) InitUser(ctx context.Context, user models.Uid, root cid.Cid) error { 48 - hs.lk.Lock() 49 - defer hs.lk.Unlock() 50 - _, ok := hs.heads[user] 51 - if ok { 52 - return fmt.Errorf("cannot init user head if it exists already") 53 - } 54 - 55 - hs.heads[user] = root 56 - return nil 57 - }
+9 -63
repomgr/repomgr.go
··· 31 31 32 32 var log = logging.Logger("repomgr") 33 33 34 - func NewRepoManager(hs HeadStore, cs *carstore.CarStore, kmgr KeyManager) *RepoManager { 34 + func NewRepoManager(cs *carstore.CarStore, kmgr KeyManager) *RepoManager { 35 35 36 36 return &RepoManager{ 37 - hs: hs, 38 37 cs: cs, 39 38 userLocks: make(map[models.Uid]*userLock), 40 39 kmgr: kmgr, 41 40 } 42 41 } 43 42 44 - type HeadStore interface { 45 - GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) 46 - UpdateUserRepoHead(ctx context.Context, user models.Uid, root cid.Cid) error 47 - InitUser(ctx context.Context, user models.Uid, root cid.Cid) error 48 - } 49 - 50 43 type KeyManager interface { 51 44 VerifyUserSignature(context.Context, string, []byte, []byte) error 52 45 SignForUser(context.Context, string, []byte) ([]byte, error) ··· 58 51 59 52 type RepoManager struct { 60 53 cs *carstore.CarStore 61 - hs HeadStore 62 54 kmgr KeyManager 63 55 64 56 lklk sync.Mutex ··· 154 146 unlock := rm.lockUser(ctx, user) 155 147 defer unlock() 156 148 157 - head, err := rm.hs.GetUserRepoHead(ctx, user) 149 + head, err := rm.cs.GetUserRepoHead(ctx, user) 158 150 if err != nil { 159 151 return "", cid.Undef, err 160 152 } ··· 184 176 return "", cid.Undef, fmt.Errorf("close with root: %w", err) 185 177 } 186 178 187 - // TODO: what happens if this update fails? 188 - if err := rm.hs.UpdateUserRepoHead(ctx, user, nroot); err != nil { 189 - return "", cid.Undef, fmt.Errorf("updating user head: %w", err) 190 - } 191 - 192 179 var oldroot *cid.Cid 193 180 if head.Defined() { 194 181 oldroot = &head ··· 220 207 unlock := rm.lockUser(ctx, user) 221 208 defer unlock() 222 209 223 - head, err := rm.hs.GetUserRepoHead(ctx, user) 210 + head, err := rm.cs.GetUserRepoHead(ctx, user) 224 211 if err != nil { 225 212 return cid.Undef, err 226 213 } ··· 251 238 return cid.Undef, fmt.Errorf("close with root: %w", err) 252 239 } 253 240 254 - // TODO: what happens if this update fails? 255 - if err := rm.hs.UpdateUserRepoHead(ctx, user, nroot); err != nil { 256 - return cid.Undef, fmt.Errorf("updating user head: %w", err) 257 - } 258 - 259 241 var oldroot *cid.Cid 260 242 if head.Defined() { 261 243 oldroot = &head ··· 287 269 unlock := rm.lockUser(ctx, user) 288 270 defer unlock() 289 271 290 - head, err := rm.hs.GetUserRepoHead(ctx, user) 272 + head, err := rm.cs.GetUserRepoHead(ctx, user) 291 273 if err != nil { 292 274 return err 293 275 } ··· 317 299 return fmt.Errorf("close with root: %w", err) 318 300 } 319 301 320 - // TODO: what happens if this update fails? 321 - if err := rm.hs.UpdateUserRepoHead(ctx, user, nroot); err != nil { 322 - return fmt.Errorf("updating user head: %w", err) 323 - } 324 302 var oldroot *cid.Cid 325 303 if head.Defined() { 326 304 oldroot = &head ··· 382 360 return fmt.Errorf("close with root: %w", err) 383 361 } 384 362 385 - if err := rm.hs.InitUser(ctx, user, root); err != nil { 386 - return fmt.Errorf("initializing user in headstore: %w", err) 387 - } 388 - 389 363 if rm.events != nil { 390 364 rm.events(ctx, &RepoEvent{ 391 365 User: user, ··· 407 381 unlock := rm.lockUser(ctx, user) 408 382 defer unlock() 409 383 410 - return rm.hs.GetUserRepoHead(ctx, user) 384 + return rm.cs.GetUserRepoHead(ctx, user) 411 385 } 412 386 413 387 func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, earlyCid, lateCid cid.Cid, w io.Writer) error { ··· 420 394 return cid.Undef, nil, err 421 395 } 422 396 423 - head, err := rm.hs.GetUserRepoHead(ctx, user) 397 + head, err := rm.cs.GetUserRepoHead(ctx, user) 424 398 if err != nil { 425 399 return cid.Undef, nil, err 426 400 } ··· 448 422 return nil, err 449 423 } 450 424 451 - head, err := rm.hs.GetUserRepoHead(ctx, uid) 425 + head, err := rm.cs.GetUserRepoHead(ctx, uid) 452 426 if err != nil { 453 427 return nil, err 454 428 } ··· 532 506 533 507 if err := ds.CloseAsRebase(ctx, root); err != nil { 534 508 return fmt.Errorf("finalizing rebase: %w", err) 535 - } 536 - 537 - // TODO: what happens if this update fails? 538 - if err := rm.hs.UpdateUserRepoHead(ctx, uid, root); err != nil { 539 - return fmt.Errorf("updating user head: %w", err) 540 509 } 541 510 542 511 if rm.events != nil { ··· 715 684 return fmt.Errorf("close with root: %w", err) 716 685 } 717 686 718 - // TODO: what happens if this update fails? 719 - if err := rm.hs.UpdateUserRepoHead(ctx, uid, root); err != nil { 720 - return fmt.Errorf("updating user head: %w", err) 721 - } 722 - 723 687 if rm.events != nil { 724 688 rm.events(ctx, &RepoEvent{ 725 689 User: uid, ··· 745 709 unlock := rm.lockUser(ctx, user) 746 710 defer unlock() 747 711 748 - head, err := rm.hs.GetUserRepoHead(ctx, user) 712 + head, err := rm.cs.GetUserRepoHead(ctx, user) 749 713 if err != nil { 750 714 return err 751 715 } ··· 827 791 return fmt.Errorf("close with root: %w", err) 828 792 } 829 793 830 - // TODO: what happens if this update fails? 831 - if err := rm.hs.UpdateUserRepoHead(ctx, user, nroot); err != nil { 832 - return fmt.Errorf("updating user head: %w", err) 833 - } 834 - 835 794 var oldroot *cid.Cid 836 795 if head.Defined() { 837 796 oldroot = &head ··· 857 816 unlock := rm.lockUser(ctx, user) 858 817 defer unlock() 859 818 860 - head, err := rm.hs.GetUserRepoHead(ctx, user) 861 - if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { 862 - return err 863 - } 864 - 865 - cshead, err := rm.cs.GetUserRepoHead(ctx, user) 819 + head, err := rm.cs.GetUserRepoHead(ctx, user) 866 820 if err != nil { 867 821 return err 868 - } 869 - 870 - if head != cshead { 871 - return fmt.Errorf("mismatch between carstore head tracking and repomgr: %s != %s", head, cshead) 872 822 } 873 823 874 824 if head != oldest { ··· 915 865 return err 916 866 } 917 867 918 - if err := rm.hs.UpdateUserRepoHead(ctx, user, nu); err != nil { 919 - // TODO: this will lead to things being in an inconsistent state 920 - return fmt.Errorf("failed to update repo head: %w", err) 921 - } 922 868 var oldroot *cid.Cid 923 869 if old.Defined() { 924 870 oldroot = &old
+1 -1
testing/utils.go
··· 425 425 //kmgr := indexer.NewKeyManager(didr, nil) 426 426 kmgr := &bsutil.FakeKeyManager{} 427 427 428 - repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(maindb), cs, kmgr) 428 + repoman := repomgr.NewRepoManager(cs, kmgr) 429 429 430 430 notifman := notifs.NewNotificationManager(maindb, repoman.GetRecord) 431 431