this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

carstore.CarStore is an interface, carstore.FileCarStore implements it

authored by

Brian Olson and committed by
Brian Olson
5c2cb480 daa3810f

+60 -47
+44 -31
carstore/bs.go
··· 48 48 49 49 const BigShardThreshold = 2 << 20 50 50 51 - type CarStore struct { 51 + type CarStore interface { 52 + CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) 53 + GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) 54 + GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) 55 + GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) 56 + ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) 57 + NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) 58 + ReadOnlySession(user models.Uid) (*DeltaSession, error) 59 + ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error 60 + Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) 61 + WipeUserData(ctx context.Context, user models.Uid) error 62 + } 63 + 64 + type FileCarStore struct { 52 65 meta *CarStoreGormMeta 53 66 rootDir string 54 67 ··· 56 69 lastShardCache map[models.Uid]*CarShard 57 70 } 58 71 59 - func NewCarStore(meta *gorm.DB, root string) (*CarStore, error) { 72 + func NewCarStore(meta *gorm.DB, root string) (CarStore, error) { 60 73 if _, err := os.Stat(root); err != nil { 61 74 if !os.IsNotExist(err) { 62 75 return nil, err ··· 73 86 return nil, err 74 87 } 75 88 76 - return &CarStore{ 89 + return &FileCarStore{ 77 90 meta: &CarStoreGormMeta{meta: meta}, 78 91 rootDir: root, 79 92 lastShardCache: make(map[models.Uid]*CarShard), ··· 81 94 } 82 95 83 96 type userView struct { 84 - cs *CarStore 97 + cs *FileCarStore 85 98 user models.Uid 86 99 87 100 cache map[cid.Cid]blockformat.Block ··· 256 269 baseCid cid.Cid 257 270 seq int 258 271 readonly bool 259 - cs *CarStore 272 + cs *FileCarStore 260 273 lastRev string 261 274 } 262 275 263 - func (cs *CarStore) checkLastShardCache(user models.Uid) *CarShard { 276 + func (cs *FileCarStore) checkLastShardCache(user models.Uid) *CarShard { 264 277 cs.lscLk.Lock() 265 278 defer cs.lscLk.Unlock() 266 279 ··· 272 285 return nil 273 286 } 274 287 275 - func (cs *CarStore) removeLastShardCache(user models.Uid) { 288 + func (cs *FileCarStore) removeLastShardCache(user models.Uid) { 276 289 cs.lscLk.Lock() 277 290 defer cs.lscLk.Unlock() 278 291 279 292 delete(cs.lastShardCache, user) 280 293 } 281 294 282 - func (cs *CarStore) putLastShardCache(ls *CarShard) { 295 + func (cs *FileCarStore) putLastShardCache(ls *CarShard) { 283 296 cs.lscLk.Lock() 284 297 defer cs.lscLk.Unlock() 285 298 286 299 cs.lastShardCache[ls.Usr] = ls 287 300 } 288 301 289 - func (cs *CarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { 302 + func (cs *FileCarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { 290 303 ctx, span := otel.Tracer("carstore").Start(ctx, "getLastShard") 291 304 defer span.End() 292 305 ··· 306 319 307 320 var ErrRepoBaseMismatch = fmt.Errorf("attempted a delta session on top of the wrong previous head") 308 321 309 - func (cs *CarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { 322 + func (cs *FileCarStore) NewDeltaSession(ctx context.Context, user models.Uid, since *string) (*DeltaSession, error) { 310 323 ctx, span := otel.Tracer("carstore").Start(ctx, "NewSession") 311 324 defer span.End() 312 325 ··· 338 351 }, nil 339 352 } 340 353 341 - func (cs *CarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { 354 + func (cs *FileCarStore) ReadOnlySession(user models.Uid) (*DeltaSession, error) { 342 355 return &DeltaSession{ 343 356 base: &userView{ 344 357 user: user, ··· 353 366 } 354 367 355 368 // TODO: incremental is only ever called true, remove the param 356 - func (cs *CarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { 369 + func (cs *FileCarStore) ReadUserCar(ctx context.Context, user models.Uid, sinceRev string, incremental bool, w io.Writer) error { 357 370 ctx, span := otel.Tracer("carstore").Start(ctx, "ReadUserCar") 358 371 defer span.End() 359 372 ··· 401 414 402 415 // inner loop part of ReadUserCar 403 416 // copy shard blocks from disk to Writer 404 - func (cs *CarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error { 417 + func (cs *FileCarStore) writeShardBlocks(ctx context.Context, sh *CarShard, w io.Writer) error { 405 418 ctx, span := otel.Tracer("carstore").Start(ctx, "writeShardBlocks") 406 419 defer span.End() 407 420 ··· 425 438 } 426 439 427 440 // inner loop part of compactBucket 428 - func (cs *CarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error { 441 + func (cs *FileCarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error { 429 442 fi, err := os.Open(sh.Path) 430 443 if err != nil { 431 444 return err ··· 528 541 func fnameForShard(user models.Uid, seq int) string { 529 542 return fmt.Sprintf("sh-%d-%d", user, seq) 530 543 } 531 - func (cs *CarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 544 + func (cs *FileCarStore) openNewShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 532 545 // TODO: some overwrite protections 533 546 fname := filepath.Join(cs.rootDir, fnameForShard(user, seq)) 534 547 fi, err := os.Create(fname) ··· 539 552 return fi, fname, nil 540 553 } 541 554 542 - func (cs *CarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) { 555 + func (cs *FileCarStore) writeNewShardFile(ctx context.Context, user models.Uid, seq int, data []byte) (string, error) { 543 556 _, span := otel.Tracer("carstore").Start(ctx, "writeNewShardFile") 544 557 defer span.End() 545 558 ··· 552 565 return fname, nil 553 566 } 554 567 555 - func (cs *CarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { 568 + func (cs *FileCarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { 556 569 return os.Remove(sh.Path) 557 570 } 558 571 ··· 587 600 return hnw, nil 588 601 } 589 602 590 - func (cs *CarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 603 + func (cs *FileCarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 591 604 592 605 buf := new(bytes.Buffer) 593 606 hnw, err := WriteCarHeader(buf, root) ··· 646 659 return buf.Bytes(), nil 647 660 } 648 661 649 - func (cs *CarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error { 662 + func (cs *FileCarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error { 650 663 ctx, span := otel.Tracer("carstore").Start(ctx, "putShard") 651 664 defer span.End() 652 665 ··· 726 739 return dropset, nil 727 740 } 728 741 729 - func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { 742 + func (cs *FileCarStore) ImportSlice(ctx context.Context, uid models.Uid, since *string, carslice []byte) (cid.Cid, *DeltaSession, error) { 730 743 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") 731 744 defer span.End() 732 745 ··· 774 787 return nil 775 788 } 776 789 777 - func (cs *CarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 790 + func (cs *FileCarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 778 791 lastShard, err := cs.getLastShard(ctx, user) 779 792 if err != nil { 780 793 return cid.Undef, err ··· 786 799 return lastShard.Root.CID, nil 787 800 } 788 801 789 - func (cs *CarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 802 + func (cs *FileCarStore) GetUserRepoRev(ctx context.Context, user models.Uid) (string, error) { 790 803 lastShard, err := cs.getLastShard(ctx, user) 791 804 if err != nil { 792 805 return "", err ··· 804 817 Created time.Time 805 818 } 806 819 807 - func (cs *CarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { 820 + func (cs *FileCarStore) Stat(ctx context.Context, usr models.Uid) ([]UserStat, error) { 808 821 shards, err := cs.meta.GetUserShards(ctx, usr) 809 822 if err != nil { 810 823 return nil, err ··· 822 835 return out, nil 823 836 } 824 837 825 - func (cs *CarStore) WipeUserData(ctx context.Context, user models.Uid) error { 838 + func (cs *FileCarStore) WipeUserData(ctx context.Context, user models.Uid) error { 826 839 shards, err := cs.meta.GetUserShards(ctx, user) 827 840 if err != nil { 828 841 return err ··· 839 852 return nil 840 853 } 841 854 842 - func (cs *CarStore) deleteShards(ctx context.Context, shs []CarShard) error { 855 + func (cs *FileCarStore) deleteShards(ctx context.Context, shs []CarShard) error { 843 856 ctx, span := otel.Tracer("carstore").Start(ctx, "deleteShards") 844 857 defer span.End() 845 858 ··· 965 978 return len(cb.shards) == 0 966 979 } 967 980 968 - func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 981 + func (cs *FileCarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 969 982 // TODO: some overwrite protections 970 983 // NOTE CreateTemp is used for creating a non-colliding file, but we keep it and don't delete it so don't think of it as "temporary". 971 984 // This creates "sh-%d-%d%s" with some random stuff in the last position ··· 982 995 NumShards int 983 996 } 984 997 985 - func (cs *CarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { 998 + func (cs *FileCarStore) GetCompactionTargets(ctx context.Context, shardCount int) ([]CompactionTarget, error) { 986 999 ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets") 987 1000 defer span.End() 988 1001 ··· 990 1003 } 991 1004 992 1005 // getBlockRefsForShards is a prep function for CompactUserShards 993 - func (cs *CarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) { 1006 + func (cs *FileCarStore) getBlockRefsForShards(ctx context.Context, shardIds []uint) ([]blockRef, error) { 994 1007 ctx, span := otel.Tracer("carstore").Start(ctx, "getBlockRefsForShards") 995 1008 defer span.End() 996 1009 ··· 1028 1041 DupeCount int `json:"dupeCount"` 1029 1042 } 1030 1043 1031 - func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { 1044 + func (cs *FileCarStore) CompactUserShards(ctx context.Context, user models.Uid, skipBigShards bool) (*CompactionStats, error) { 1032 1045 ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") 1033 1046 defer span.End() 1034 1047 ··· 1242 1255 return stats, nil 1243 1256 } 1244 1257 1245 - func (cs *CarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { 1258 + func (cs *FileCarStore) deleteStaleRefs(ctx context.Context, uid models.Uid, brefs []blockRef, staleRefs []staleRef, removedShards map[uint]bool) error { 1246 1259 ctx, span := otel.Tracer("carstore").Start(ctx, "deleteStaleRefs") 1247 1260 defer span.End() 1248 1261 ··· 1277 1290 return cs.meta.SetStaleRef(ctx, uid, staleToKeep) 1278 1291 } 1279 1292 1280 - func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { 1293 + func (cs *FileCarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { 1281 1294 ctx, span := otel.Tracer("carstore").Start(ctx, "compactBucket") 1282 1295 defer span.End() 1283 1296
+2 -2
carstore/repo_test.go
··· 24 24 "gorm.io/gorm" 25 25 ) 26 26 27 - func testCarStore() (*CarStore, func(), error) { 27 + func testCarStore() (CarStore, func(), error) { 28 28 tempdir, err := os.MkdirTemp("", "msttest-") 29 29 if err != nil { 30 30 return nil, nil, err ··· 250 250 checkRepo(t, cs, buf, recs) 251 251 } 252 252 253 - func checkRepo(t *testing.T, cs *CarStore, r io.Reader, expRecs []cid.Cid) { 253 + func checkRepo(t *testing.T, cs CarStore, r io.Reader, expRecs []cid.Cid) { 254 254 t.Helper() 255 255 rep, err := repo.ReadRepoFromCar(context.TODO(), r) 256 256 if err != nil {
+2 -2
events/dbpersist.go
··· 51 51 type DbPersistence struct { 52 52 db *gorm.DB 53 53 54 - cs *carstore.CarStore 54 + cs carstore.CarStore 55 55 56 56 lk sync.Mutex 57 57 ··· 86 86 Ops []byte 87 87 } 88 88 89 - func NewDbPersistence(db *gorm.DB, cs *carstore.CarStore, options *Options) (*DbPersistence, error) { 89 + func NewDbPersistence(db *gorm.DB, cs carstore.CarStore, options *Options) (*DbPersistence, error) { 90 90 if err := db.AutoMigrate(&RepoEventRecord{}); err != nil { 91 91 return nil, err 92 92 }
+1 -1
events/dbpersist_test.go
··· 268 268 } 269 269 } 270 270 271 - func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, *carstore.CarStore, string, error) { 271 + func setupDBs(t testing.TB) (*gorm.DB, *gorm.DB, carstore.CarStore, string, error) { 272 272 dir, err := os.MkdirTemp("", "integtest") 273 273 if err != nil { 274 274 return nil, nil, nil, "", err
+3 -3
events/diskpersist_test.go
··· 187 187 188 188 } 189 189 190 - func runPersisterBenchmark(b *testing.B, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 190 + func runPersisterBenchmark(b *testing.B, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 191 191 ctx := context.Background() 192 192 193 193 db.AutoMigrate(&pds.User{}) ··· 302 302 runEventManagerTest(t, cs, db, dp) 303 303 } 304 304 305 - func runEventManagerTest(t *testing.T, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 305 + func runEventManagerTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 306 306 ctx := context.Background() 307 307 308 308 db.AutoMigrate(&pds.User{}) ··· 409 409 runTakedownTest(t, cs, db, dp) 410 410 } 411 411 412 - func runTakedownTest(t *testing.T, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 412 + func runTakedownTest(t *testing.T, cs carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 413 413 ctx := context.TODO() 414 414 415 415 db.AutoMigrate(&pds.User{})
+1 -1
pds/handlers_test.go
··· 17 17 "gorm.io/gorm" 18 18 ) 19 19 20 - func testCarStore(t *testing.T, db *gorm.DB) (*carstore.CarStore, func()) { 20 + func testCarStore(t *testing.T, db *gorm.DB) (carstore.CarStore, func()) { 21 21 t.Helper() 22 22 tempdir, err := os.MkdirTemp("", "msttest-") 23 23 if err != nil {
+2 -2
pds/server.go
··· 41 41 42 42 type Server struct { 43 43 db *gorm.DB 44 - cs *carstore.CarStore 44 + cs carstore.CarStore 45 45 repoman *repomgr.RepoManager 46 46 feedgen *FeedGenerator 47 47 notifman notifs.NotificationManager ··· 65 65 // NewServer. 66 66 const serverListenerBootTimeout = 5 * time.Second 67 67 68 - func NewServer(db *gorm.DB, cs *carstore.CarStore, serkey *did.PrivKey, handleSuffix, serviceUrl string, didr plc.PLCClient, jwtkey []byte) (*Server, error) { 68 + func NewServer(db *gorm.DB, cs carstore.CarStore, serkey *did.PrivKey, handleSuffix, serviceUrl string, didr plc.PLCClient, jwtkey []byte) (*Server, error) { 69 69 db.AutoMigrate(&User{}) 70 70 db.AutoMigrate(&Peering{}) 71 71
+2 -2
repomgr/ingest_test.go
··· 69 69 } 70 70 } 71 71 72 - func testCarstore(t *testing.T, dir string) *carstore.CarStore { 72 + func testCarstore(t *testing.T, dir string) carstore.CarStore { 73 73 cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite"))) 74 74 if err != nil { 75 75 t.Fatal(err) ··· 151 151 } 152 152 } 153 153 154 - func doPost(t *testing.T, cs *carstore.CarStore, did string, prev *string, postid int) ([]byte, cid.Cid, string, string) { 154 + func doPost(t *testing.T, cs carstore.CarStore, did string, prev *string, postid int) ([]byte, cid.Cid, string, string) { 155 155 ctx := context.TODO() 156 156 ds, err := cs.NewDeltaSession(ctx, 1, prev) 157 157 if err != nil {
+3 -3
repomgr/repomgr.go
··· 33 33 34 34 var log = logging.Logger("repomgr") 35 35 36 - func NewRepoManager(cs *carstore.CarStore, kmgr KeyManager) *RepoManager { 36 + func NewRepoManager(cs carstore.CarStore, kmgr KeyManager) *RepoManager { 37 37 38 38 return &RepoManager{ 39 39 cs: cs, ··· 53 53 } 54 54 55 55 type RepoManager struct { 56 - cs *carstore.CarStore 56 + cs carstore.CarStore 57 57 kmgr KeyManager 58 58 59 59 lklk sync.Mutex ··· 140 140 } 141 141 } 142 142 143 - func (rm *RepoManager) CarStore() *carstore.CarStore { 143 + func (rm *RepoManager) CarStore() carstore.CarStore { 144 144 return rm.cs 145 145 } 146 146