this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

track no longer referenced blocks, delete refs (#280)

We won't need this until the sync changes that remove history land.

This adds tracking of which blocks are no longer referenced during
processing of a new commit. We then delete the carstore references for
those blocks when accepting that commit.
Work still to be done is figuring out how we want to compact old shard
files.

It seems relatively straightforward to do the compaction as a
post-process step that queries recently changed repos and checks if
enough block references have been dropped to make it worth running a
repo compaction. (in writing this, im thinking we might want a 'stale'
flag on blockRefs that we set to true instead of deleting them).
When compacting, to make the read paths more efficient, we should leave
the most recent several car slices alone and only compact the oldest N
shards. For efficiency on the write path it is advantageous to not have
individual files be too big, so that when reading data for making edits,
we can read a small number of shards whole.

authored by

Whyrusleeping and committed by
GitHub
ebc21133 0f8fc7bb

+247 -11
+95 -1
carstore/bs.go
··· 24 24 "github.com/ipfs/go-libipfs/blocks" 25 25 car "github.com/ipld/go-car" 26 26 carutil "github.com/ipld/go-car/util" 27 + cbg "github.com/whyrusleeping/cbor-gen" 27 28 "go.opentelemetry.io/otel" 28 29 "gorm.io/gorm" 29 30 ) ··· 80 81 Cid models.DbCID `gorm:"index"` 81 82 Shard uint 82 83 Offset int64 84 + Dirty bool 83 85 //User uint `gorm:"index"` 84 86 } 85 87 ··· 238 240 type DeltaSession struct { 239 241 fresh blockstore.Blockstore 240 242 blks map[cid.Cid]blockformat.Block 243 + rmcids map[cid.Cid]bool 241 244 base blockstore.Blockstore 242 245 user models.Uid 243 246 seq int ··· 631 634 return nil, fmt.Errorf("failed to write shard file: %w", err) 632 635 } 633 636 634 - // TODO: all this database work needs to be in a single transaction 635 637 shard := CarShard{ 636 638 Root: models.DbCID{root}, 637 639 DataStart: hnw, ··· 666 668 667 669 if err := createBlockRefs(ctx, tx, brefs); err != nil { 668 670 return fmt.Errorf("failed to create block refs: %w", err) 671 + } 672 + 673 + if len(ds.rmcids) > 0 { 674 + var torm []models.DbCID 675 + for c := range ds.rmcids { 676 + torm = append(torm, models.DbCID{c}) 677 + } 678 + 679 + subq := ds.cs.meta.Model(&blockRef{}).Joins("left join car_shards cs on cs.id = block_refs.shard").Where("cid in (?) AND usr = ?", torm, ds.user).Select("block_refs.id") 680 + if err := tx.Model(&blockRef{}).Where("id in (?)", subq).UpdateColumn("dirty", true).Error; err != nil { 681 + return err 682 + } 669 683 } 670 684 671 685 err := tx.WithContext(ctx).Commit().Error ··· 776 790 return int64(nw), nil 777 791 } 778 792 793 + func setToSlice(s map[cid.Cid]bool) []cid.Cid { 794 + out := make([]cid.Cid, 0, len(s)) 795 + for c := range s { 796 + out = append(out, c) 797 + } 798 + 799 + return out 800 + } 801 + 802 + func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids []cid.Cid) (map[cid.Cid]bool, error) { 803 + ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff") 804 + defer span.End() 805 + 806 + if !oldroot.Defined() { 807 + return map[cid.Cid]bool{}, nil 808 + } 809 + 810 + // walk the entire 'new' portion of the tree, marking all referenced cids as 'keep' 811 + keepset := make(map[cid.Cid]bool) 812 + for _, c := range newcids { 813 + keepset[c] = true 814 + oblk, err := bs.Get(ctx, c) 815 + if err != nil { 816 + return nil, err 817 + } 818 + 819 + if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { 820 + keepset[lnk] = true 821 + }); err != nil { 822 + return nil, err 823 + } 824 + } 825 + 826 + if keepset[oldroot] { 827 + // this should probably never happen, but is technically correct 828 + return nil, nil 829 + } 830 + 831 + // next, walk the old tree from the root, recursing on cids *not* in the keepset. 832 + dropset := make(map[cid.Cid]bool) 833 + dropset[oldroot] = true 834 + queue := []cid.Cid{oldroot} 835 + 836 + for len(queue) > 0 { 837 + c := queue[0] 838 + queue = queue[1:] 839 + 840 + oblk, err := bs.Get(ctx, c) 841 + if err != nil { 842 + return nil, err 843 + } 844 + 845 + if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) { 846 + if !keepset[lnk] { 847 + dropset[lnk] = true 848 + queue = append(queue, lnk) 849 + } 850 + }); err != nil { 851 + return nil, err 852 + } 853 + } 854 + 855 + return dropset, nil 856 + } 857 + 779 858 func (cs *CarStore) ImportSlice(ctx context.Context, uid models.Uid, prev *cid.Cid, carslice []byte) (cid.Cid, *DeltaSession, error) { 780 859 ctx, span := otel.Tracer("carstore").Start(ctx, "ImportSlice") 781 860 defer span.End() ··· 794 873 return cid.Undef, nil, err 795 874 } 796 875 876 + var cids []cid.Cid 797 877 for { 798 878 blk, err := carr.Next() 799 879 if err != nil { ··· 802 882 } 803 883 return cid.Undef, nil, err 804 884 } 885 + 886 + cids = append(cids, blk.Cid()) 805 887 806 888 if err := ds.Put(ctx, blk); err != nil { 807 889 return cid.Undef, nil, err 808 890 } 809 891 } 892 + 893 + base := cid.Undef 894 + if prev != nil { 895 + base = *prev 896 + } 897 + 898 + rmcids, err := BlockDiff(ctx, ds, base, cids) 899 + if err != nil { 900 + return cid.Undef, nil, err 901 + } 902 + 903 + ds.rmcids = rmcids 810 904 811 905 return carr.Header.Roots[0], ds, nil 812 906 }
+86 -2
repo/cbor_gen.go
··· 25 25 } 26 26 27 27 cw := cbg.NewCborWriter(w) 28 + fieldCount := 6 28 29 29 - if _, err := cw.Write([]byte{165}); err != nil { 30 + if t.Rev == "" { 31 + fieldCount-- 32 + } 33 + 34 + if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 30 35 return err 31 36 } 32 37 ··· 51 56 } 52 57 if _, err := cw.WriteString(string(t.Did)); err != nil { 53 58 return err 59 + } 60 + 61 + // t.Rev (string) (string) 62 + if t.Rev != "" { 63 + 64 + if len("rev") > cbg.MaxLength { 65 + return xerrors.Errorf("Value in field \"rev\" was too long") 66 + } 67 + 68 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil { 69 + return err 70 + } 71 + if _, err := io.WriteString(w, string("rev")); err != nil { 72 + return err 73 + } 74 + 75 + if len(t.Rev) > cbg.MaxLength { 76 + return xerrors.Errorf("Value in field t.Rev was too long") 77 + } 78 + 79 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil { 80 + return err 81 + } 82 + if _, err := io.WriteString(w, string(t.Rev)); err != nil { 83 + return err 84 + } 54 85 } 55 86 56 87 // t.Sig ([]uint8) (slice) ··· 187 218 } 188 219 189 220 t.Did = string(sval) 221 + } 222 + // t.Rev (string) (string) 223 + case "rev": 224 + 225 + { 226 + sval, err := cbg.ReadString(cr) 227 + if err != nil { 228 + return err 229 + } 230 + 231 + t.Rev = string(sval) 190 232 } 191 233 // t.Sig ([]uint8) (slice) 192 234 case "sig": ··· 288 330 } 289 331 290 332 cw := cbg.NewCborWriter(w) 333 + fieldCount := 5 291 334 292 - if _, err := cw.Write([]byte{164}); err != nil { 335 + if t.Rev == "" { 336 + fieldCount-- 337 + } 338 + 339 + if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 293 340 return err 294 341 } 295 342 ··· 314 361 } 315 362 if _, err := cw.WriteString(string(t.Did)); err != nil { 316 363 return err 364 + } 365 + 366 + // t.Rev (string) (string) 367 + if t.Rev != "" { 368 + 369 + if len("rev") > cbg.MaxLength { 370 + return xerrors.Errorf("Value in field \"rev\" was too long") 371 + } 372 + 373 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("rev"))); err != nil { 374 + return err 375 + } 376 + if _, err := io.WriteString(w, string("rev")); err != nil { 377 + return err 378 + } 379 + 380 + if len(t.Rev) > cbg.MaxLength { 381 + return xerrors.Errorf("Value in field t.Rev was too long") 382 + } 383 + 384 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Rev))); err != nil { 385 + return err 386 + } 387 + if _, err := io.WriteString(w, string(t.Rev)); err != nil { 388 + return err 389 + } 317 390 } 318 391 319 392 // t.Data (cid.Cid) (struct) ··· 426 499 } 427 500 428 501 t.Did = string(sval) 502 + } 503 + // t.Rev (string) (string) 504 + case "rev": 505 + 506 + { 507 + sval, err := cbg.ReadString(cr) 508 + if err != nil { 509 + return err 510 + } 511 + 512 + t.Rev = string(sval) 429 513 } 430 514 // t.Data (cid.Cid) (struct) 431 515 case "data":
+9 -8
repo/repo.go
··· 19 19 ) 20 20 21 21 // current version of repo currently implemented 22 - const ATP_REPO_VERSION int64 = 2 22 + const ATP_REPO_VERSION int64 = 3 23 + 24 + const ATP_REPO_VERSION_2 int64 = 2 23 25 24 26 type SignedCommit struct { 25 27 Did string `cborgen:"did"` ··· 27 29 Prev *cid.Cid `cborgen:"prev"` 28 30 Data cid.Cid `cborgen:"data"` 29 31 Sig []byte `cborgen:"sig"` 32 + Rev string `cborgen:"rev,omitempty"` 30 33 } 31 34 32 35 type UnsignedCommit struct { ··· 34 37 Version int64 `cborgen:"version"` 35 38 Prev *cid.Cid `cborgen:"prev"` 36 39 Data cid.Cid `cborgen:"data"` 40 + Rev string `cborgen:"rev,omitempty"` 37 41 } 38 42 39 43 type Repo struct { ··· 55 59 Version: sc.Version, 56 60 Prev: sc.Prev, 57 61 Data: sc.Data, 62 + Rev: sc.Rev, 58 63 } 59 64 } 60 65 ··· 131 136 return nil, fmt.Errorf("loading root from blockstore: %w", err) 132 137 } 133 138 134 - if sc.Version != ATP_REPO_VERSION { 139 + if sc.Version != ATP_REPO_VERSION && sc.Version != ATP_REPO_VERSION_2 { 135 140 return nil, fmt.Errorf("unsupported repo version: %d", sc.Version) 136 141 } 137 142 ··· 262 267 return cid.Undef, err 263 268 } 264 269 265 - var nprev *cid.Cid 266 - if r.repoCid.Defined() { 267 - nprev = &r.repoCid 268 - } 269 - 270 270 ncom := UnsignedCommit{ 271 271 Did: r.RepoDid(), 272 272 Version: ATP_REPO_VERSION, 273 - Prev: nprev, 274 273 Data: rcid, 274 + Rev: NextTID(), 275 275 } 276 276 277 277 sb, err := ncom.BytesForSigning() ··· 289 289 Version: ncom.Version, 290 290 Prev: ncom.Prev, 291 291 Data: ncom.Data, 292 + Rev: ncom.Rev, 292 293 } 293 294 294 295 nsccid, err := r.cst.Put(ctx, &nsc)
+57
repomgr/ingest_test.go
··· 6 6 "fmt" 7 7 "os" 8 8 "path/filepath" 9 + "strings" 9 10 "testing" 10 11 11 12 atproto "github.com/bluesky-social/indigo/api/atproto" ··· 228 229 t.Fatal(err) 229 230 } 230 231 } 232 + 233 + func TestDuplicateRecord(t *testing.T) { 234 + dir, err := os.MkdirTemp("", "integtest") 235 + if err != nil { 236 + t.Fatal(err) 237 + } 238 + 239 + maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite"))) 240 + if err != nil { 241 + t.Fatal(err) 242 + } 243 + maindb.AutoMigrate(models.ActorInfo{}) 244 + 245 + did := "did:plc:beepboop" 246 + maindb.Create(&models.ActorInfo{ 247 + Did: did, 248 + Uid: 1, 249 + }) 250 + 251 + cs := testCarstore(t, dir) 252 + 253 + repoman := NewRepoManager(cs, &util.FakeKeyManager{}) 254 + 255 + ctx := context.TODO() 256 + if err := repoman.InitNewActor(ctx, 1, "hello.world", "did:plc:foobar", "", "", ""); err != nil { 257 + t.Fatal(err) 258 + } 259 + 260 + p1, _, err := repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 261 + Text: fmt.Sprintf("hello friend"), 262 + }) 263 + if err != nil { 264 + t.Fatal(err) 265 + } 266 + 267 + p2, _, err := repoman.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 268 + Text: fmt.Sprintf("hello friend"), 269 + }) 270 + if err != nil { 271 + t.Fatal(err) 272 + } 273 + 274 + rkey2 := strings.Split(p2, "/")[1] 275 + if err := repoman.DeleteRecord(ctx, 1, "app.bsky.feed.post", rkey2); err != nil { 276 + t.Fatal(err) 277 + } 278 + 279 + rkey1 := strings.Split(p1, "/")[1] 280 + c, rec, err := repoman.GetRecord(ctx, 1, "app.bsky.feed.post", rkey1, cid.Undef) 281 + if err != nil { 282 + t.Fatal(err) 283 + } 284 + 285 + _ = c 286 + _ = rec 287 + }