this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

extract headStore to make it easier to swap out database (#213)

The in-memory variant goes about 20% faster in the benchmark. The rest
is carstore costs

authored by

Whyrusleeping and committed by
GitHub
76a63856 1495fe3c

+217 -67
+1 -1
cmd/bigsky/main.go
··· 207 207 208 208 kmgr := indexer.NewKeyManager(cachedidr, nil) 209 209 210 - repoman := repomgr.NewRepoManager(db, cstore, kmgr) 210 + repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cstore, kmgr) 211 211 212 212 dbp, err := events.NewDbPersistence(db, cstore, nil) 213 213 if err != nil {
+1 -1
events/dbpersist_test.go
··· 43 43 Did: "did:example:123", 44 44 }) 45 45 46 - mgr := repomgr.NewRepoManager(db, cs, &util.FakeKeyManager{}) 46 + mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 47 47 48 48 err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 49 49 if err != nil {
+1 -1
indexer/posts_test.go
··· 55 55 t.Fatal(err) 56 56 } 57 57 58 - repoman := repomgr.NewRepoManager(maindb, cs, &util.FakeKeyManager{}) 58 + repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(maindb), cs, &util.FakeKeyManager{}) 59 59 notifman := notifs.NewNotificationManager(maindb, repoman.GetRecord) 60 60 evtman := events.NewEventManager(events.NewMemPersister()) 61 61
+1 -1
labeler/service.go
··· 75 75 didr := &api.PLCServer{Host: plcURL} 76 76 kmgr := indexer.NewKeyManager(didr, repoUser.SigningKey) 77 77 evtmgr := events.NewEventManager(events.NewMemPersister()) 78 - repoman := repomgr.NewRepoManager(db, cs, kmgr) 78 + repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, kmgr) 79 79 80 80 if repoUser.Password == "" || repoUser.Did == "" || repoUser.Handle == "" { 81 81 return nil, fmt.Errorf("bad labeler repo config (empty string)")
+2 -1
pds/server.go
··· 77 77 78 78 kmgr := indexer.NewKeyManager(didr, serkey) 79 79 80 - repoman := repomgr.NewRepoManager(db, cs, kmgr) 80 + hs := repomgr.NewDbHeadStore(db) 81 + repoman := repomgr.NewRepoManager(hs, cs, kmgr) 81 82 notifman := notifs.NewNotificationManager(db, repoman.GetRecord) 82 83 83 84 ix, err := indexer.NewIndexer(db, notifman, evtman, didr, repoman, false, true)
+66
repomgr/bench_test.go
··· 1 + package repomgr 2 + 3 + import ( 4 + "context" 5 + "os" 6 + "path/filepath" 7 + "testing" 8 + 9 + bsky "github.com/bluesky-social/indigo/api/bsky" 10 + "github.com/bluesky-social/indigo/carstore" 11 + "github.com/bluesky-social/indigo/util" 12 + "gorm.io/driver/sqlite" 13 + "gorm.io/gorm" 14 + ) 15 + 16 + func setupDb(t testing.TB, p string) *gorm.DB { 17 + t.Helper() 18 + 19 + db, err := gorm.Open(sqlite.Open(p)) 20 + if err != nil { 21 + t.Fatal(err) 22 + } 23 + 24 + if err := db.Exec("PRAGMA journal_mode=WAL;").Error; err != nil { 25 + t.Fatal(err) 26 + } 27 + 28 + return db 29 + } 30 + 31 + func BenchmarkRepoMgrCreates(b *testing.B) { 32 + dir, err := os.MkdirTemp("", "integtest") 33 + if err != nil { 34 + b.Fatal(err) 35 + } 36 + 37 + cardb := setupDb(b, filepath.Join(dir, "car.sqlite")) 38 + 39 + cspath := filepath.Join(dir, "carstore") 40 + if err := os.Mkdir(cspath, 0775); err != nil { 41 + b.Fatal(err) 42 + } 43 + 44 + cs, err := carstore.NewCarStore(cardb, cspath) 45 + if err != nil { 46 + b.Fatal(err) 47 + } 48 + 49 + hs := NewMemHeadStore() 50 + repoman := NewRepoManager(hs, cs, &util.FakeKeyManager{}) 51 + 52 + ctx := context.TODO() 53 + if err := repoman.InitNewActor(ctx, 1, "hello.world", "did:foo:bar", "catdog", "", ""); err != nil { 54 + b.Fatal(err) 55 + } 56 + 57 + b.ResetTimer() 58 + for i := 0; i < b.N; i++ { 59 + _, _, err = repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 60 + Text: "cats", 61 + }) 62 + if err != nil { 63 + b.Fatal(err) 64 + } 65 + } 66 + }
+67
repomgr/dbheadstore.go
··· 1 + package repomgr 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/bluesky-social/indigo/util" 7 + "github.com/ipfs/go-cid" 8 + "go.opentelemetry.io/otel" 9 + "gorm.io/gorm" 10 + "gorm.io/gorm/clause" 11 + ) 12 + 13 + func NewDbHeadStore(db *gorm.DB) *DbHeadStore { 14 + db.AutoMigrate(RepoHead{}) 15 + 16 + return &DbHeadStore{db} 17 + } 18 + 19 + type DbHeadStore struct { 20 + db *gorm.DB 21 + } 22 + 23 + func (hs *DbHeadStore) InitUser(ctx context.Context, user util.Uid, root cid.Cid) error { 24 + if err := hs.db.Create(&RepoHead{ 25 + Usr: user, 26 + Root: root.String(), 27 + }).Error; err != nil { 28 + return err 29 + } 30 + 31 + return nil 32 + } 33 + 34 + func (hs *DbHeadStore) UpdateUserRepoHead(ctx context.Context, user util.Uid, root cid.Cid) error { 35 + if err := hs.db.WithContext(ctx).Clauses(clause.OnConflict{ 36 + Columns: []clause.Column{{Name: "usr"}}, 37 + DoUpdates: clause.AssignmentColumns([]string{"root"}), 38 + }).Create(&RepoHead{ 39 + Usr: user, 40 + Root: root.String(), 41 + }).Error; err != nil { 42 + return err 43 + } 44 + 45 + return nil 46 + } 47 + 48 + func (hs *DbHeadStore) GetUserRepoHead(ctx context.Context, user util.Uid) (cid.Cid, error) { 49 + ctx, span := otel.Tracer("repoman").Start(ctx, "GetUserRepoHead") 50 + defer span.End() 51 + 52 + var headrec RepoHead 53 + if err := hs.db.Find(&headrec, "usr = ?", user).Error; err != nil { 54 + return cid.Undef, err 55 + } 56 + 57 + if headrec.ID == 0 { 58 + return cid.Undef, gorm.ErrRecordNotFound 59 + } 60 + 61 + cc, err := cid.Decode(headrec.Root) 62 + if err != nil { 63 + return cid.Undef, err 64 + } 65 + 66 + return cc, nil 67 + }
+4 -2
repomgr/ingest_test.go
··· 59 59 t.Fatal(err) 60 60 } 61 61 62 - repoman := NewRepoManager(maindb, cs, &util.FakeKeyManager{}) 62 + hs := NewDbHeadStore(maindb) 63 + repoman := NewRepoManager(hs, cs, &util.FakeKeyManager{}) 63 64 64 65 fi, err := os.Open("../testing/testdata/divy.repo") 65 66 if err != nil { ··· 112 113 113 114 cs := testCarstore(t, dir) 114 115 115 - repoman := NewRepoManager(maindb, cs, &util.FakeKeyManager{}) 116 + hs := NewDbHeadStore(maindb) 117 + repoman := NewRepoManager(hs, cs, &util.FakeKeyManager{}) 116 118 117 119 dir2, err := os.MkdirTemp("", "integtest") 118 120 if err != nil {
+48
repomgr/memheadstore.go
··· 1 + package repomgr 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "github.com/bluesky-social/indigo/util" 8 + "github.com/ipfs/go-cid" 9 + ) 10 + 11 + type MemHeadStore struct { 12 + heads map[util.Uid]cid.Cid 13 + } 14 + 15 + func NewMemHeadStore() *MemHeadStore { 16 + return &MemHeadStore{ 17 + heads: make(map[util.Uid]cid.Cid), 18 + } 19 + } 20 + 21 + func (hs *MemHeadStore) GetUserRepoHead(ctx context.Context, user util.Uid) (cid.Cid, error) { 22 + h, ok := hs.heads[user] 23 + if !ok { 24 + return cid.Undef, fmt.Errorf("user head not found") 25 + } 26 + 27 + return h, nil 28 + } 29 + 30 + func (hs *MemHeadStore) UpdateUserRepoHead(ctx context.Context, user util.Uid, root cid.Cid) error { 31 + _, ok := hs.heads[user] 32 + if !ok { 33 + return fmt.Errorf("cannot update user head if it doesnt exist already") 34 + } 35 + 36 + hs.heads[user] = root 37 + return nil 38 + } 39 + 40 + func (hs *MemHeadStore) InitUser(ctx context.Context, user util.Uid, root cid.Cid) error { 41 + _, ok := hs.heads[user] 42 + if ok { 43 + return fmt.Errorf("cannot init user head if it exists already") 44 + } 45 + 46 + hs.heads[user] = root 47 + return nil 48 + }
+25 -59
repomgr/repomgr.go
··· 26 26 cbg "github.com/whyrusleeping/cbor-gen" 27 27 "go.opentelemetry.io/otel" 28 28 "gorm.io/gorm" 29 - "gorm.io/gorm/clause" 30 29 ) 31 30 32 31 var log = logging.Logger("repomgr") 33 32 34 - func NewRepoManager(db *gorm.DB, cs *carstore.CarStore, kmgr KeyManager) *RepoManager { 35 - db.AutoMigrate(RepoHead{}) 33 + func NewRepoManager(hs HeadStore, cs *carstore.CarStore, kmgr KeyManager) *RepoManager { 36 34 37 35 return &RepoManager{ 38 - db: db, 36 + hs: hs, 39 37 cs: cs, 40 38 userLocks: make(map[util.Uid]*userLock), 41 39 kmgr: kmgr, 42 40 } 43 41 } 44 42 43 + type HeadStore interface { 44 + GetUserRepoHead(ctx context.Context, user util.Uid) (cid.Cid, error) 45 + UpdateUserRepoHead(ctx context.Context, user util.Uid, root cid.Cid) error 46 + InitUser(ctx context.Context, user util.Uid, root cid.Cid) error 47 + } 48 + 45 49 type KeyManager interface { 46 50 VerifyUserSignature(context.Context, string, []byte, []byte) error 47 51 SignForUser(context.Context, string, []byte) ([]byte, error) ··· 53 57 54 58 type RepoManager struct { 55 59 cs *carstore.CarStore 56 - db *gorm.DB 60 + hs HeadStore 57 61 kmgr KeyManager 58 62 59 63 lklk sync.Mutex ··· 139 143 } 140 144 } 141 145 142 - func (rm *RepoManager) getUserRepoHead(ctx context.Context, user util.Uid) (cid.Cid, error) { 143 - ctx, span := otel.Tracer("repoman").Start(ctx, "getUserRepoHead") 144 - defer span.End() 145 - 146 - var headrec RepoHead 147 - if err := rm.db.Find(&headrec, "usr = ?", user).Error; err != nil { 148 - return cid.Undef, err 149 - } 150 - 151 - if headrec.ID == 0 { 152 - return cid.Undef, gorm.ErrRecordNotFound 153 - } 154 - 155 - cc, err := cid.Decode(headrec.Root) 156 - if err != nil { 157 - return cid.Undef, err 158 - } 159 - 160 - return cc, nil 161 - } 162 - 163 - func (rm *RepoManager) updateUserRepoHead(ctx context.Context, user util.Uid, root cid.Cid) error { 164 - if err := rm.db.WithContext(ctx).Clauses(clause.OnConflict{ 165 - Columns: []clause.Column{{Name: "usr"}}, 166 - DoUpdates: clause.AssignmentColumns([]string{"root"}), 167 - }).Create(&RepoHead{ 168 - Usr: user, 169 - Root: root.String(), 170 - }).Error; err != nil { 171 - return err 172 - } 173 - 174 - return nil 175 - } 176 - 177 146 func (rm *RepoManager) CarStore() *carstore.CarStore { 178 147 return rm.cs 179 148 } ··· 185 154 unlock := rm.lockUser(ctx, user) 186 155 defer unlock() 187 156 188 - head, err := rm.getUserRepoHead(ctx, user) 157 + head, err := rm.hs.GetUserRepoHead(ctx, user) 189 158 if err != nil { 190 159 return "", cid.Undef, err 191 160 } ··· 216 185 } 217 186 218 187 // TODO: what happens if this update fails? 219 - if err := rm.updateUserRepoHead(ctx, user, nroot); err != nil { 188 + if err := rm.hs.UpdateUserRepoHead(ctx, user, nroot); err != nil { 220 189 return "", cid.Undef, fmt.Errorf("updating user head: %w", err) 221 190 } 222 191 ··· 251 220 unlock := rm.lockUser(ctx, user) 252 221 defer unlock() 253 222 254 - head, err := rm.getUserRepoHead(ctx, user) 223 + head, err := rm.hs.GetUserRepoHead(ctx, user) 255 224 if err != nil { 256 225 return cid.Undef, err 257 226 } ··· 283 252 } 284 253 285 254 // TODO: what happens if this update fails? 286 - if err := rm.updateUserRepoHead(ctx, user, nroot); err != nil { 255 + if err := rm.hs.UpdateUserRepoHead(ctx, user, nroot); err != nil { 287 256 return cid.Undef, fmt.Errorf("updating user head: %w", err) 288 257 } 289 258 ··· 318 287 unlock := rm.lockUser(ctx, user) 319 288 defer unlock() 320 289 321 - head, err := rm.getUserRepoHead(ctx, user) 290 + head, err := rm.hs.GetUserRepoHead(ctx, user) 322 291 if err != nil { 323 292 return err 324 293 } ··· 349 318 } 350 319 351 320 // TODO: what happens if this update fails? 352 - if err := rm.updateUserRepoHead(ctx, user, nroot); err != nil { 321 + if err := rm.hs.UpdateUserRepoHead(ctx, user, nroot); err != nil { 353 322 return fmt.Errorf("updating user head: %w", err) 354 323 } 355 324 var oldroot *cid.Cid ··· 413 382 return err 414 383 } 415 384 416 - if err := rm.db.Create(&RepoHead{ 417 - Usr: user, 418 - Root: root.String(), 419 - }).Error; err != nil { 385 + if err := rm.hs.InitUser(ctx, user, root); err != nil { 420 386 return err 421 387 } 422 388 ··· 441 407 unlock := rm.lockUser(ctx, user) 442 408 defer unlock() 443 409 444 - return rm.getUserRepoHead(ctx, user) 410 + return rm.hs.GetUserRepoHead(ctx, user) 445 411 } 446 412 447 413 func (rm *RepoManager) ReadRepo(ctx context.Context, user util.Uid, earlyCid, lateCid cid.Cid, w io.Writer) error { ··· 454 420 return cid.Undef, nil, err 455 421 } 456 422 457 - head, err := rm.getUserRepoHead(ctx, user) 423 + head, err := rm.hs.GetUserRepoHead(ctx, user) 458 424 if err != nil { 459 425 return cid.Undef, nil, err 460 426 } ··· 482 448 return nil, err 483 449 } 484 450 485 - head, err := rm.getUserRepoHead(ctx, uid) 451 + head, err := rm.hs.GetUserRepoHead(ctx, uid) 486 452 if err != nil { 487 453 return nil, err 488 454 } ··· 569 535 } 570 536 571 537 // TODO: what happens if this update fails? 572 - if err := rm.updateUserRepoHead(ctx, uid, root); err != nil { 538 + if err := rm.hs.UpdateUserRepoHead(ctx, uid, root); err != nil { 573 539 return fmt.Errorf("updating user head: %w", err) 574 540 } 575 541 ··· 750 716 } 751 717 752 718 // TODO: what happens if this update fails? 753 - if err := rm.updateUserRepoHead(ctx, uid, root); err != nil { 719 + if err := rm.hs.UpdateUserRepoHead(ctx, uid, root); err != nil { 754 720 return fmt.Errorf("updating user head: %w", err) 755 721 } 756 722 ··· 779 745 unlock := rm.lockUser(ctx, user) 780 746 defer unlock() 781 747 782 - head, err := rm.getUserRepoHead(ctx, user) 748 + head, err := rm.hs.GetUserRepoHead(ctx, user) 783 749 if err != nil { 784 750 return err 785 751 } ··· 862 828 } 863 829 864 830 // TODO: what happens if this update fails? 865 - if err := rm.updateUserRepoHead(ctx, user, nroot); err != nil { 831 + if err := rm.hs.UpdateUserRepoHead(ctx, user, nroot); err != nil { 866 832 return fmt.Errorf("updating user head: %w", err) 867 833 } 868 834 ··· 891 857 unlock := rm.lockUser(ctx, user) 892 858 defer unlock() 893 859 894 - head, err := rm.getUserRepoHead(ctx, user) 860 + head, err := rm.hs.GetUserRepoHead(ctx, user) 895 861 if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { 896 862 return err 897 863 } ··· 949 915 return err 950 916 } 951 917 952 - if err := rm.updateUserRepoHead(ctx, user, nu); err != nil { 918 + if err := rm.hs.UpdateUserRepoHead(ctx, user, nu); err != nil { 953 919 // TODO: this will lead to things being in an inconsistent state 954 920 return fmt.Errorf("failed to update repo head: %w", err) 955 921 }
+1 -1
testing/utils.go
··· 425 425 //kmgr := indexer.NewKeyManager(didr, nil) 426 426 kmgr := &bsutil.FakeKeyManager{} 427 427 428 - repoman := repomgr.NewRepoManager(maindb, cs, kmgr) 428 + repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(maindb), cs, kmgr) 429 429 430 430 notifman := notifs.NewNotificationManager(maindb, repoman.GetRecord) 431 431