this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement first pass at compaction of carstore shards (#299)

authored by

Whyrusleeping and committed by
GitHub
4a061917 4f03bd9e

+612 -103
+35
bgs/admin.go
··· 2 2 3 3 import ( 4 4 "errors" 5 + "fmt" 5 6 "net/http" 6 7 "strconv" 7 8 "strings" ··· 399 400 "success": "true", 400 401 }) 401 402 } 403 + 404 + func (bgs *BGS) handleAdminCompactRepo(e echo.Context) error { 405 + ctx := e.Request().Context() 406 + 407 + did := e.QueryParam("did") 408 + if did == "" { 409 + return fmt.Errorf("must pass a did") 410 + } 411 + 412 + u, err := bgs.lookupUserByDid(ctx, did) 413 + if err != nil { 414 + return fmt.Errorf("no such user: %w", err) 415 + } 416 + 417 + if err := bgs.repoman.CarStore().CompactUserShards(ctx, u.ID); err != nil { 418 + return fmt.Errorf("compaction failed: %w", err) 419 + } 420 + 421 + return e.JSON(200, map[string]any{ 422 + "success": "true", 423 + }) 424 + } 425 + 426 + func (bgs *BGS) handleAdminCompactAllRepos(e echo.Context) error { 427 + ctx := e.Request().Context() 428 + 429 + if err := bgs.runRepoCompaction(ctx); err != nil { 430 + return fmt.Errorf("compaction run failed: %w", err) 431 + } 432 + 433 + return e.JSON(200, map[string]any{ 434 + "success": "true", 435 + }) 436 + }
+21
bgs/bgs.go
··· 305 305 // Repo-related Admin API 306 306 admin.POST("/repo/takeDown", bgs.handleAdminTakeDownRepo) 307 307 admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown) 308 + admin.POST("/repo/compact", bgs.handleAdminCompactRepo) 309 + admin.POST("/repo/compactAll", bgs.handleAdminCompactAllRepos) 308 310 309 311 // PDS-related Admin API 310 312 admin.GET("/pds/list", bgs.handleListPDSs) ··· 1029 1031 1030 1032 return nil 1031 1033 } 1034 + 1035 + func (bgs *BGS) runRepoCompaction(ctx context.Context) error { 1036 + ctx, span := otel.Tracer("bgs").Start(ctx, "runRepoCompaction") 1037 + defer span.End() 1038 + 1039 + repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx) 1040 + if err != nil { 1041 + return fmt.Errorf("failed to get repos to compact: %w", err) 1042 + } 1043 + 1044 + for _, r := range repos { 1045 + if err := bgs.repoman.CarStore().CompactUserShards(ctx, r.Usr); err != nil { 1046 + log.Errorf("failed to compact shards for user %d: %s", r.Usr, err) 1047 + continue 1048 + } 1049 + } 1050 + 1051 + return nil 1052 + }
+421 -95
carstore/bs.go
··· 9 9 "io" 10 10 "os" 11 11 "path/filepath" 12 + "sort" 12 13 "strings" 13 14 "sync" 14 15 "time" ··· 26 27 carutil "github.com/ipld/go-car/util" 27 28 cbg "github.com/whyrusleeping/cbor-gen" 28 29 "go.opentelemetry.io/otel" 30 + "go.opentelemetry.io/otel/attribute" 29 31 "gorm.io/gorm" 30 32 ) 31 33 ··· 52 54 if err := meta.AutoMigrate(&CarShard{}, &blockRef{}); err != nil { 53 55 return nil, err 54 56 } 57 + if err := meta.AutoMigrate(&staleRef{}); err != nil { 58 + return nil, err 59 + } 60 + 55 61 return &CarStore{ 56 62 meta: meta, 57 63 rootDir: root, ··· 73 79 Seq int `gorm:"index:idx_car_shards_seq;index:idx_car_shards_usr_seq,priority:2,sort:desc"` 74 80 Path string 75 81 Usr models.Uid `gorm:"index:idx_car_shards_usr;index:idx_car_shards_usr_seq,priority:1"` 76 - Rebase bool 77 82 Rev string 78 83 } 79 84 80 85 type blockRef struct { 81 86 ID uint `gorm:"primarykey"` 82 87 Cid models.DbCID `gorm:"index"` 83 - Shard uint 88 + Shard uint `gorm:"index"` 84 89 Offset int64 85 - Dirty bool 86 90 //User uint `gorm:"index"` 91 + } 92 + 93 + type staleRef struct { 94 + ID uint `gorm:"primarykey"` 95 + Cid models.DbCID `gorm:"index"` 96 + Usr models.Uid 87 97 } 88 98 89 99 type userView struct { ··· 262 272 return nil 263 273 } 264 274 265 - func (cs *CarStore) putLastShardCache(user models.Uid, ls *CarShard) { 275 + func (cs *CarStore) putLastShardCache(ls *CarShard) { 266 276 cs.lscLk.Lock() 267 277 defer cs.lscLk.Unlock() 268 278 269 - cs.lastShardCache[user] = ls 279 + cs.lastShardCache[ls.Usr] = ls 270 280 } 271 281 272 282 func (cs *CarStore) getLastShard(ctx context.Context, user models.Uid) (*CarShard, error) { ··· 288 298 //} 289 299 } 290 300 291 - cs.putLastShardCache(user, &lastShard) 301 + cs.putLastShardCache(&lastShard) 292 302 return &lastShard, nil 293 303 } 294 304 ··· 375 385 } 376 386 377 387 for _, sh := range shards { 378 - // for rebase shards, only include the modified root, not the whole tree 379 - if sh.Rebase && incremental { 380 - if err := cs.writeBlockFromShard(ctx, &sh, w, sh.Root.CID); err != nil { 381 - return err 382 - } 383 - } else { 384 - if err := cs.writeShardBlocks(ctx, &sh, w); err != nil { 385 - return err 386 - } 388 + if err := cs.writeShardBlocks(ctx, &sh, w); err != nil { 389 + return err 387 390 } 388 391 } 389 392 ··· 438 441 } 439 442 } 440 443 444 + func (cs *CarStore) iterateShardBlocks(ctx context.Context, sh *CarShard, cb func(blk blockformat.Block) error) error { 445 + fi, err := os.Open(sh.Path) 446 + if err != nil { 447 + return err 448 + } 449 + defer fi.Close() 450 + 451 + rr, err := car.NewCarReader(fi) 452 + if err != nil { 453 + return err 454 + } 455 + 456 + for { 457 + blk, err := rr.Next() 458 + if err != nil { 459 + if err == io.EOF { 460 + return nil 461 + } 462 + return err 463 + } 464 + 465 + if err := cb(blk); err != nil { 466 + return err 467 + } 468 + } 469 + } 470 + 441 471 var _ blockstore.Blockstore = (*DeltaSession)(nil) 442 472 443 473 func (ds *DeltaSession) BaseCid() cid.Cid { ··· 539 569 } 540 570 541 571 func (cs *CarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { 542 - return os.Remove(fnameForShard(sh.Usr, sh.Seq)) 572 + return os.Remove(sh.Path) 543 573 } 544 574 545 575 // CloseWithRoot writes all new blocks in a car file to the writer with the 546 576 // given cid as the 'root' 547 577 func (ds *DeltaSession) CloseWithRoot(ctx context.Context, root cid.Cid, rev string) ([]byte, error) { 548 - return ds.closeWithRoot(ctx, root, rev, false) 578 + ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot") 579 + defer span.End() 580 + 581 + if ds.readonly { 582 + return nil, fmt.Errorf("cannot write to readonly deltaSession") 583 + } 584 + 585 + return ds.cs.writeNewShard(ctx, root, rev, ds.user, ds.seq, ds.blks, ds.rmcids) 549 586 } 550 587 551 588 func WriteCarHeader(w io.Writer, root cid.Cid) (int64, error) { ··· 566 603 return hnw, nil 567 604 } 568 605 569 - func (ds *DeltaSession) closeWithRoot(ctx context.Context, root cid.Cid, rev string, rebase bool) ([]byte, error) { 570 - ctx, span := otel.Tracer("carstore").Start(ctx, "CloseWithRoot") 571 - defer span.End() 572 - 573 - if ds.readonly { 574 - return nil, fmt.Errorf("cannot write to readonly deltaSession") 575 - } 606 + func (cs *CarStore) writeNewShard(ctx context.Context, root cid.Cid, rev string, user models.Uid, seq int, blks map[cid.Cid]blockformat.Block, rmcids map[cid.Cid]bool) ([]byte, error) { 576 607 577 608 buf := new(bytes.Buffer) 578 609 hnw, err := WriteCarHeader(buf, root) ··· 586 617 587 618 offset := hnw 588 619 //brefs := make([]*blockRef, 0, len(ds.blks)) 589 - brefs := make([]map[string]interface{}, 0, len(ds.blks)) 590 - for k, blk := range ds.blks { 620 + brefs := make([]map[string]interface{}, 0, len(blks)) 621 + for k, blk := range blks { 591 622 nw, err := LdWrite(buf, k.Bytes(), blk.RawData()) 592 623 if err != nil { 593 624 return nil, fmt.Errorf("failed to write block: %w", err) ··· 610 641 offset += nw 611 642 } 612 643 613 - path, err := ds.cs.writeNewShardFile(ctx, ds.user, ds.seq, buf.Bytes()) 644 + path, err := cs.writeNewShardFile(ctx, user, seq, buf.Bytes()) 614 645 if err != nil { 615 646 return nil, fmt.Errorf("failed to write shard file: %w", err) 616 647 } ··· 618 649 shard := CarShard{ 619 650 Root: models.DbCID{root}, 620 651 DataStart: hnw, 621 - Seq: ds.seq, 652 + Seq: seq, 622 653 Path: path, 623 - Usr: ds.user, 654 + Usr: user, 624 655 Rev: rev, 625 656 } 626 657 627 - if err := ds.putShard(ctx, &shard, brefs); err != nil { 658 + if err := cs.putShard(ctx, &shard, brefs, rmcids, false); err != nil { 628 659 return nil, err 629 660 } 630 661 631 662 return buf.Bytes(), nil 632 663 } 633 664 634 - func (ds *DeltaSession) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any) error { 665 + func (cs *CarStore) putShard(ctx context.Context, shard *CarShard, brefs []map[string]any, rmcids map[cid.Cid]bool, nocache bool) error { 635 666 ctx, span := otel.Tracer("carstore").Start(ctx, "putShard") 636 667 defer span.End() 637 668 638 669 // TODO: there should be a way to create the shard and block_refs that 639 670 // reference it in the same query, would save a lot of time 640 - tx := ds.cs.meta.WithContext(ctx).Begin() 671 + tx := cs.meta.WithContext(ctx).Begin() 641 672 642 673 if err := tx.WithContext(ctx).Create(shard).Error; err != nil { 643 674 return fmt.Errorf("failed to create shard in DB tx: %w", err) 644 675 } 645 - ds.cs.putLastShardCache(ds.user, shard) 676 + 677 + if !nocache { 678 + cs.putLastShardCache(shard) 679 + } 646 680 647 681 for _, ref := range brefs { 648 682 ref["shard"] = shard.ID ··· 652 686 return fmt.Errorf("failed to create block refs: %w", err) 653 687 } 654 688 655 - if len(ds.rmcids) > 0 { 656 - var torm []models.DbCID 657 - for c := range ds.rmcids { 658 - torm = append(torm, models.DbCID{c}) 689 + if len(rmcids) > 0 { 690 + var torm []staleRef 691 + for c := range rmcids { 692 + torm = append(torm, staleRef{ 693 + Cid: models.DbCID{c}, 694 + Usr: shard.Usr, 695 + }) 659 696 } 660 697 661 - subq := ds.cs.meta.Model(&blockRef{}).Joins("left join car_shards cs on cs.id = block_refs.shard").Where("cid in (?) AND usr = ?", torm, ds.user).Select("block_refs.id") 662 - if err := tx.Model(&blockRef{}).Where("id in (?)", subq).UpdateColumn("dirty", true).Error; err != nil { 698 + if err := tx.Create(torm).Error; err != nil { 663 699 return err 664 700 } 665 701 } ··· 715 751 return nil 716 752 } 717 753 718 - func (ds *DeltaSession) CloseAsRebase(ctx context.Context, root cid.Cid, rev string) error { 719 - _, err := ds.closeWithRoot(ctx, root, rev, true) 720 - if err != nil { 721 - return err 722 - } 723 - 724 - // TODO: this *could* get large, might be worth doing it incrementally 725 - var oldslices []CarShard 726 - if err := ds.cs.meta.Find(&oldslices, "usr = ? AND seq < ?", ds.user, ds.seq).Error; err != nil { 727 - return err 728 - } 729 - 730 - // If anything here fails, cleanup is straightforward. Simply look for any 731 - // shard in the database with a higher seq shard marked as 'rebase' 732 - for _, sl := range oldslices { 733 - if err := os.Remove(sl.Path); err != nil { 734 - if !os.IsNotExist(err) { 735 - return err 736 - } 737 - } 738 - 739 - if err := ds.cs.meta.Delete(&sl).Error; err != nil { 740 - return err 741 - } 742 - 743 - if err := ds.cs.meta.Where("shard = ?", sl.ID).Delete(&blockRef{}).Error; err != nil { 744 - return err 745 - } 746 - } 747 - 748 - return nil 749 - } 750 - 751 754 func LdWrite(w io.Writer, d ...[]byte) (int64, error) { 752 755 var sum uint64 753 756 for _, s := range d { ··· 781 784 return out 782 785 } 783 786 784 - func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids []cid.Cid) (map[cid.Cid]bool, error) { 787 + func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block) (map[cid.Cid]bool, error) { 785 788 ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff") 786 789 defer span.End() 787 790 ··· 791 794 792 795 // walk the entire 'new' portion of the tree, marking all referenced cids as 'keep' 793 796 keepset := make(map[cid.Cid]bool) 794 - for _, c := range newcids { 797 + for c := range newcids { 795 798 keepset[c] = true 796 799 oblk, err := bs.Get(ctx, c) 797 800 if err != nil { ··· 872 875 } 873 876 } 874 877 875 - rmcids, err := BlockDiff(ctx, ds, ds.baseCid, cids) 878 + rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks) 876 879 if err != nil { 877 880 return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s): %w", ds.baseCid, err) 878 881 } ··· 882 885 return carr.Header.Roots[0], ds, nil 883 886 } 884 887 888 + func (ds *DeltaSession) CalcDiff(ctx context.Context, nroot cid.Cid) error { 889 + rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks) 890 + if err != nil { 891 + return fmt.Errorf("block diff failed: %w", err) 892 + } 893 + 894 + ds.rmcids = rmcids 895 + return nil 896 + } 897 + 885 898 func (cs *CarStore) GetUserRepoHead(ctx context.Context, user models.Uid) (cid.Cid, error) { 886 899 lastShard, err := cs.getLastShard(ctx, user) 887 900 if err != nil { ··· 930 943 return out, nil 931 944 } 932 945 933 - func (cs *CarStore) checkFork(ctx context.Context, user models.Uid, prev cid.Cid) (bool, error) { 934 - ctx, span := otel.Tracer("carstore").Start(ctx, "checkFork") 935 - defer span.End() 946 + func (cs *CarStore) TakeDownRepo(ctx context.Context, user models.Uid) error { 947 + var shards []CarShard 948 + if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil { 949 + return err 950 + } 951 + 952 + for _, sh := range shards { 953 + if err := cs.deleteShard(ctx, &sh); err != nil { 954 + if !os.IsNotExist(err) { 955 + return err 956 + } 957 + } 958 + } 959 + 960 + if err := cs.meta.Delete(&CarShard{}, "usr = ?", user).Error; err != nil { 961 + return err 962 + } 963 + 964 + return nil 965 + } 966 + 967 + func (cs *CarStore) deleteShard(ctx context.Context, sh *CarShard) error { 968 + if err := cs.meta.Delete(&CarShard{}, "id = ?", sh.ID).Error; err != nil { 969 + return err 970 + } 971 + 972 + if err := cs.meta.Delete(&blockRef{}, "shard = ?", sh.ID).Error; err != nil { 973 + return err 974 + } 975 + 976 + return cs.deleteShardFile(ctx, sh) 977 + } 936 978 937 - lastShard, err := cs.getLastShard(ctx, user) 979 + type shardStat struct { 980 + ID uint 981 + Seq int 982 + Dirty int 983 + Total int 984 + 985 + refs []blockRef 986 + } 987 + 988 + func (s shardStat) dirtyFrac() float64 { 989 + return float64(s.Dirty) / float64(s.Total) 990 + } 991 + 992 + func shouldCompact(s shardStat) bool { 993 + // if shard is mostly removed blocks 994 + if s.dirtyFrac() > 0.5 { 995 + return true 996 + } 997 + 998 + // if its a big shard with a sufficient number of removed blocks 999 + if s.Dirty > 1000 { 1000 + return true 1001 + } 1002 + 1003 + // if its just rather small and we want to compact it up with other shards 1004 + if s.Total < 20 { 1005 + return true 1006 + } 1007 + 1008 + return false 1009 + } 1010 + 1011 + func aggrRefs(brefs []blockRef, staleCids map[cid.Cid]bool) []shardStat { 1012 + byId := make(map[uint]*shardStat) 1013 + 1014 + for _, br := range brefs { 1015 + s, ok := byId[br.Shard] 1016 + if !ok { 1017 + s = &shardStat{ 1018 + ID: br.Shard, 1019 + } 1020 + byId[br.Shard] = s 1021 + } 1022 + 1023 + s.Total++ 1024 + if staleCids[br.Cid.CID] { 1025 + s.Dirty++ 1026 + } 1027 + 1028 + s.refs = append(s.refs, br) 1029 + } 1030 + 1031 + var out []shardStat 1032 + for _, s := range byId { 1033 + out = append(out, *s) 1034 + } 1035 + 1036 + sort.Slice(out, func(i, j int) bool { 1037 + return out[i].ID < out[j].ID 1038 + }) 1039 + 1040 + return out 1041 + } 1042 + 1043 + type compBucket struct { 1044 + shards []shardStat 1045 + 1046 + cleanBlocks int 1047 + } 1048 + 1049 + func (cb *compBucket) addShardStat(ss shardStat) { 1050 + cb.cleanBlocks += (ss.Total - ss.Dirty) 1051 + cb.shards = append(cb.shards, ss) 1052 + } 1053 + 1054 + func (cb *compBucket) isEmpty() bool { 1055 + return len(cb.shards) == 0 1056 + } 1057 + 1058 + func (cs *CarStore) copyShardBlocksFiltered(ctx context.Context, sh *CarShard, w io.Writer, keep map[cid.Cid]bool) error { 1059 + fi, err := os.Open(sh.Path) 938 1060 if err != nil { 939 - return false, err 1061 + return err 940 1062 } 1063 + defer fi.Close() 941 1064 942 - var maybeShard CarShard 943 - if err := cs.meta.WithContext(ctx).Model(CarShard{}).Find(&maybeShard, "usr = ? AND root = ?", user, &models.DbCID{prev}).Error; err != nil { 944 - return false, err 1065 + rr, err := car.NewCarReader(fi) 1066 + if err != nil { 1067 + return err 1068 + } 1069 + 1070 + for { 1071 + blk, err := rr.Next() 1072 + if err != nil { 1073 + return err 1074 + } 1075 + 1076 + if keep[blk.Cid()] { 1077 + _, err := LdWrite(w, blk.Cid().Bytes(), blk.RawData()) 1078 + return err 1079 + } 945 1080 } 1081 + } 946 1082 947 - if maybeShard.ID != 0 && maybeShard.ID == lastShard.ID { 948 - // somehow we are checking if a valid 'append' is a fork, seems buggy, throw an error 949 - return false, fmt.Errorf("invariant broken: checked for forkiness of a valid append (%d - %d)", lastShard.ID, maybeShard.ID) 1083 + func (cs *CarStore) openNewCompactedShardFile(ctx context.Context, user models.Uid, seq int) (*os.File, string, error) { 1084 + // TODO: some overwrite protections 1085 + fi, err := os.CreateTemp(cs.rootDir, fnameForShard(user, seq)) 1086 + if err != nil { 1087 + return nil, "", err 950 1088 } 951 1089 952 - if maybeShard.ID == 0 { 953 - return false, nil 1090 + return fi, fi.Name(), nil 1091 + } 1092 + 1093 + type CompactionTarget struct { 1094 + Usr models.Uid 1095 + NumShards int 1096 + } 1097 + 1098 + func (cs *CarStore) GetCompactionTargets(ctx context.Context) ([]CompactionTarget, error) { 1099 + ctx, span := otel.Tracer("carstore").Start(ctx, "GetCompactionTargets") 1100 + defer span.End() 1101 + 1102 + var targets []CompactionTarget 1103 + if err := cs.meta.Raw(`select usr, count(*) as num_shards from car_shards group by usr having count(*) > 50 order by num_shards desc`).Scan(&targets).Error; err != nil { 1104 + return nil, err 954 1105 } 955 1106 956 - return true, nil 1107 + return targets, nil 957 1108 } 958 1109 959 - func (cs *CarStore) TakeDownRepo(ctx context.Context, user models.Uid) error { 1110 + func (cs *CarStore) CompactUserShards(ctx context.Context, user models.Uid) error { 1111 + ctx, span := otel.Tracer("carstore").Start(ctx, "CompactUserShards") 1112 + defer span.End() 1113 + 1114 + span.SetAttributes(attribute.Int64("user", int64(user))) 1115 + 960 1116 var shards []CarShard 961 1117 if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil { 962 1118 return err 963 1119 } 964 1120 965 - for _, sh := range shards { 966 - if err := cs.deleteShardFile(ctx, &sh); err != nil { 967 - if !os.IsNotExist(err) { 968 - return err 1121 + var shardIds []uint 1122 + for _, s := range shards { 1123 + shardIds = append(shardIds, s.ID) 1124 + } 1125 + 1126 + shardsById := make(map[uint]CarShard) 1127 + for _, s := range shards { 1128 + shardsById[s.ID] = s 1129 + } 1130 + 1131 + var brefs []blockRef 1132 + if err := cs.meta.Debug().Raw(`select * from block_refs where shard in (?)`, shardIds).Scan(&brefs).Error; err != nil { 1133 + return err 1134 + } 1135 + 1136 + var staleRefs []staleRef 1137 + if err := cs.meta.Debug().Find(&staleRefs, "usr = ?", user).Error; err != nil { 1138 + return err 1139 + } 1140 + 1141 + stale := make(map[cid.Cid]bool) 1142 + var hasDirtyDupes bool 1143 + for _, br := range staleRefs { 1144 + if stale[br.Cid.CID] { 1145 + hasDirtyDupes = true 1146 + break 1147 + } 1148 + stale[br.Cid.CID] = true 1149 + } 1150 + 1151 + if hasDirtyDupes { 1152 + // if we have no duplicates, then the keep set is simply all the 'clean' blockRefs 1153 + // in the case we have duplicate dirty references we have to compute 1154 + // the keep set by walking the entire repo to check if anything is 1155 + // still referencing the dirty block in question 1156 + 1157 + // we could also just add the duplicates to the keep set for now and 1158 + // focus on compacting everything else. it leaves *some* dirty blocks 1159 + // still around but we're doing that anyways since compaction isnt a 1160 + // perfect process 1161 + return fmt.Errorf("WIP: not currently handling this case") 1162 + } 1163 + 1164 + keep := make(map[cid.Cid]bool) 1165 + for _, br := range brefs { 1166 + if !stale[br.Cid.CID] { 1167 + keep[br.Cid.CID] = true 1168 + } 1169 + } 1170 + 1171 + results := aggrRefs(brefs, stale) 1172 + 1173 + thresholdForPosition := func(i int) int { 1174 + // TODO: calculate some curve here so earlier shards end up with more 1175 + // blocks and recent shards end up with less 1176 + return 50 1177 + } 1178 + 1179 + cur := new(compBucket) 1180 + var compactionQueue []*compBucket 1181 + for i, r := range results { 1182 + if shouldCompact(r) { 1183 + if cur.cleanBlocks > thresholdForPosition(i) { 1184 + compactionQueue = append(compactionQueue, cur) 1185 + cur = new(compBucket) 1186 + } 1187 + 1188 + cur.addShardStat(r) 1189 + } else { 1190 + if !cur.isEmpty() { 1191 + compactionQueue = append(compactionQueue, cur) 1192 + cur = new(compBucket) 969 1193 } 970 1194 } 971 1195 } 972 1196 973 - if err := cs.meta.Delete(&CarShard{}, "usr = ?", user).Error; err != nil { 1197 + if !cur.isEmpty() { 1198 + compactionQueue = append(compactionQueue, cur) 1199 + } 1200 + 1201 + removedShards := make(map[uint]bool) 1202 + for _, b := range compactionQueue { 1203 + if err := cs.compactBucket(ctx, user, b, shardsById, keep); err != nil { 1204 + return err 1205 + } 1206 + 1207 + for _, s := range b.shards { 1208 + removedShards[s.ID] = true 1209 + sh, ok := shardsById[s.ID] 1210 + if !ok { 1211 + return fmt.Errorf("missing shard to delete") 1212 + } 1213 + 1214 + if err := cs.deleteShard(ctx, &sh); err != nil { 1215 + return fmt.Errorf("deleting shard: %w", err) 1216 + } 1217 + } 1218 + } 1219 + 1220 + // now we need to delete the staleRefs we successfully cleaned up 1221 + // we can delete a staleRef if all the shards that have blockRefs with matching stale refs were processed 1222 + 1223 + brByCid := make(map[cid.Cid][]blockRef) 1224 + for _, br := range brefs { 1225 + brByCid[br.Cid.CID] = append(brByCid[br.Cid.CID], br) 1226 + } 1227 + 1228 + var staleToDelete []uint 1229 + for _, sr := range staleRefs { 1230 + brs := brByCid[sr.Cid.CID] 1231 + del := true 1232 + for _, br := range brs { 1233 + if !removedShards[br.Shard] { 1234 + del = false 1235 + break 1236 + } 1237 + } 1238 + 1239 + if del { 1240 + staleToDelete = append(staleToDelete, sr.ID) 1241 + } 1242 + } 1243 + 1244 + if err := cs.meta.Delete(&staleRef{}, "id in (?)", staleToDelete).Error; err != nil { 974 1245 return err 975 1246 } 976 1247 977 1248 return nil 978 1249 } 1250 + 1251 + func (cs *CarStore) compactBucket(ctx context.Context, user models.Uid, b *compBucket, shardsById map[uint]CarShard, keep map[cid.Cid]bool) error { 1252 + last := b.shards[len(b.shards)-1] 1253 + lastsh := shardsById[last.ID] 1254 + fi, path, err := cs.openNewCompactedShardFile(ctx, user, last.Seq) 1255 + if err != nil { 1256 + return err 1257 + } 1258 + 1259 + defer fi.Close() 1260 + root := lastsh.Root.CID 1261 + 1262 + hnw, err := WriteCarHeader(fi, root) 1263 + if err != nil { 1264 + return err 1265 + } 1266 + 1267 + offset := hnw 1268 + var nbrefs []map[string]any 1269 + for _, s := range b.shards { 1270 + sh := shardsById[s.ID] 1271 + if err := cs.iterateShardBlocks(ctx, &sh, func(blk blockformat.Block) error { 1272 + if keep[blk.Cid()] { 1273 + nw, err := LdWrite(fi, blk.Cid().Bytes(), blk.RawData()) 1274 + if err != nil { 1275 + return fmt.Errorf("failed to write block: %w", err) 1276 + } 1277 + 1278 + nbrefs = append(nbrefs, map[string]interface{}{ 1279 + "cid": models.DbCID{blk.Cid()}, 1280 + "offset": offset, 1281 + }) 1282 + 1283 + offset += nw 1284 + } 1285 + return nil 1286 + }); err != nil { 1287 + return err 1288 + } 1289 + } 1290 + 1291 + shard := CarShard{ 1292 + Root: models.DbCID{root}, 1293 + DataStart: hnw, 1294 + Seq: lastsh.Seq, 1295 + Path: path, 1296 + Usr: user, 1297 + Rev: lastsh.Rev, 1298 + } 1299 + 1300 + if err := cs.putShard(ctx, &shard, nbrefs, nil, true); err != nil { 1301 + return err 1302 + } 1303 + return nil 1304 + }
+135 -8
carstore/repo_test.go
··· 4 4 "bytes" 5 5 "context" 6 6 "fmt" 7 + "io" 7 8 "os" 8 9 "path/filepath" 9 10 "testing" ··· 92 93 t.Fatal(err) 93 94 } 94 95 96 + var recs []cid.Cid 95 97 head := ncid 96 98 for i := 0; i < 10; i++ { 97 99 ds, err := cs.NewDeltaSession(ctx, 1, &rev) ··· 104 106 t.Fatal(err) 105 107 } 106 108 107 - if _, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ 109 + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ 108 110 Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), 109 - }); err != nil { 111 + }) 112 + if err != nil { 110 113 t.Fatal(err) 111 114 } 115 + 116 + recs = append(recs, rc) 112 117 113 118 kmgr := &util.FakeKeyManager{} 114 119 nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) ··· 118 123 119 124 rev = nrev 120 125 126 + if err := ds.CalcDiff(ctx, nroot); err != nil { 127 + t.Fatal(err) 128 + } 129 + 121 130 if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { 122 131 t.Fatal(err) 123 132 } ··· 129 138 if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { 130 139 t.Fatal(err) 131 140 } 141 + checkRepo(t, buf, recs) 142 + 143 + if err := cs.CompactUserShards(ctx, 1); err != nil { 144 + t.Fatal(err) 145 + } 146 + 147 + buf = new(bytes.Buffer) 148 + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { 149 + t.Fatal(err) 150 + } 151 + checkRepo(t, buf, recs) 152 + } 153 + 154 + func TestRepeatedCompactions(t *testing.T) { 155 + ctx := context.TODO() 156 + 157 + cs, cleanup, err := testCarStore() 158 + if err != nil { 159 + t.Fatal(err) 160 + } 161 + defer cleanup() 162 + 163 + ds, err := cs.NewDeltaSession(ctx, 1, nil) 164 + if err != nil { 165 + t.Fatal(err) 166 + } 167 + 168 + ncid, rev, err := setupRepo(ctx, ds) 169 + if err != nil { 170 + t.Fatal(err) 171 + } 172 + 173 + if _, err := ds.CloseWithRoot(ctx, ncid, rev); err != nil { 174 + t.Fatal(err) 175 + } 176 + 177 + var recs []cid.Cid 178 + head := ncid 179 + 180 + for loop := 0; loop < 50; loop++ { 181 + for i := 0; i < 100; i++ { 182 + ds, err := cs.NewDeltaSession(ctx, 1, &rev) 183 + if err != nil { 184 + t.Fatal(err) 185 + } 186 + 187 + rr, err := repo.OpenRepo(ctx, ds, head, true) 188 + if err != nil { 189 + t.Fatal(err) 190 + } 191 + 192 + rc, _, err := rr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ 193 + Text: fmt.Sprintf("hey look its a tweet %d", time.Now().UnixNano()), 194 + }) 195 + if err != nil { 196 + t.Fatal(err) 197 + } 198 + 199 + recs = append(recs, rc) 132 200 133 - fmt.Println(buf.Len()) 201 + kmgr := &util.FakeKeyManager{} 202 + nroot, nrev, err := rr.Commit(ctx, kmgr.SignForUser) 203 + if err != nil { 204 + t.Fatal(err) 205 + } 134 206 207 + rev = nrev 208 + 209 + if err := ds.CalcDiff(ctx, nroot); err != nil { 210 + t.Fatal(err) 211 + } 212 + 213 + if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { 214 + t.Fatal(err) 215 + } 216 + 217 + head = nroot 218 + } 219 + fmt.Println("Run compaction", loop) 220 + if err := cs.CompactUserShards(ctx, 1); err != nil { 221 + t.Fatal(err) 222 + } 223 + 224 + buf := new(bytes.Buffer) 225 + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { 226 + t.Fatal(err) 227 + } 228 + checkRepo(t, buf, recs) 229 + } 230 + 231 + buf := new(bytes.Buffer) 232 + if err := cs.ReadUserCar(ctx, 1, "", true, buf); err != nil { 233 + t.Fatal(err) 234 + } 235 + checkRepo(t, buf, recs) 135 236 } 136 237 137 - func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, string, error) { 138 - nr := repo.NewRepo(ctx, "did:foo", bs) 238 + func checkRepo(t *testing.T, r io.Reader, expRecs []cid.Cid) { 239 + rep, err := repo.ReadRepoFromCar(context.TODO(), r) 240 + if err != nil { 241 + t.Fatal(err) 242 + } 139 243 140 - if _, _, err := nr.CreateRecord(ctx, "app.bsky.feed.post", &appbsky.FeedPost{ 141 - Text: fmt.Sprintf("hey look its a tweet %s", time.Now()), 244 + set := make(map[cid.Cid]bool) 245 + for _, c := range expRecs { 246 + set[c] = true 247 + } 248 + 249 + if err := rep.ForEach(context.TODO(), "", func(k string, v cid.Cid) error { 250 + if !set[v] { 251 + return fmt.Errorf("have record we didnt expect") 252 + } 253 + 254 + delete(set, v) 255 + return nil 256 + 142 257 }); err != nil { 143 - return cid.Undef, "", err 258 + t.Fatal(err) 144 259 } 260 + 261 + if len(set) > 0 { 262 + t.Fatalf("expected to find more cids in repo: %v", set) 263 + } 264 + 265 + } 266 + 267 + func setupRepo(ctx context.Context, bs blockstore.Blockstore) (cid.Cid, string, error) { 268 + nr := repo.NewRepo(ctx, "did:foo", bs) 145 269 146 270 kmgr := &util.FakeKeyManager{} 147 271 ncid, rev, err := nr.Commit(ctx, kmgr.SignForUser) ··· 201 325 } 202 326 203 327 rev = nrev 328 + if err := ds.CalcDiff(ctx, nroot); err != nil { 329 + b.Fatal(err) 330 + } 204 331 205 332 if _, err := ds.CloseWithRoot(ctx, nroot, rev); err != nil { 206 333 b.Fatal(err)