this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix issue with gap filling in index crawler (#71)

The source of the bug is the one line change in the carstore, the
repomanager changes are the part that could use the closest look,
basically i'm allowing the first commit in the gap fill process to dip
into the carstore for the blocks it needs for the diff, later diffs are
isolated and must pull blocks from the car file.

authored by

Whyrusleeping and committed by
GitHub
c93363ae 1d2c5c4c

+131 -6
+1 -1
carstore/bs.go
··· 344 344 lateSeq = fromShard.Seq 345 345 } 346 346 347 - q := cs.meta.Order("seq desc").Where("usr = ? AND seq >= ?", user, earlySeq) 347 + q := cs.meta.Order("seq desc").Where("usr = ? AND seq > ?", user, earlySeq) 348 348 if lateCid.Defined() { 349 349 q = q.Where("seq <= ?", lateSeq) 350 350 }
+2 -1
indexer/indexer.go
··· 911 911 // this process will send individual indexing events back to the indexer, doing a 'fast forward' of the users entire history 912 912 // we probably want alternative ways of doing this for 'very large' or 'very old' repos, but this works for now 913 913 if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), curHead); err != nil { 914 - return fmt.Errorf("importing fetched repo: %w", err) 914 + span.RecordError(err) 915 + return fmt.Errorf("importing fetched repo (curHead: %s): %w", from, err) 915 916 } 916 917 917 918 // TODO: this is currently doing too much work, allowing us to ignore the catchup events we've gotten
+110
repomgr/ingest_test.go
··· 1 1 package repomgr 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 6 + "fmt" 5 7 "os" 6 8 "path/filepath" 7 9 "testing" 8 10 11 + bsky "github.com/bluesky-social/indigo/api/bsky" 9 12 "github.com/bluesky-social/indigo/carstore" 13 + "github.com/bluesky-social/indigo/models" 14 + "github.com/bluesky-social/indigo/repo" 10 15 "github.com/bluesky-social/indigo/util" 11 16 "github.com/ipfs/go-cid" 12 17 "gorm.io/driver/sqlite" ··· 66 71 t.Fatal(err) 67 72 } 68 73 } 74 + 75 + func testCarstore(t *testing.T, dir string) *carstore.CarStore { 76 + cardb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "car.sqlite"))) 77 + if err != nil { 78 + t.Fatal(err) 79 + } 80 + 81 + cspath := filepath.Join(dir, "carstore") 82 + if err := os.Mkdir(cspath, 0775); err != nil { 83 + t.Fatal(err) 84 + } 85 + 86 + cs, err := carstore.NewCarStore(cardb, cspath) 87 + if err != nil { 88 + t.Fatal(err) 89 + } 90 + 91 + return cs 92 + } 93 + 94 + func TestIngestWithGap(t *testing.T) { 95 + dir, err := os.MkdirTemp("", "integtest") 96 + if err != nil { 97 + t.Fatal(err) 98 + } 99 + 100 + maindb, err := gorm.Open(sqlite.Open(filepath.Join(dir, "test.sqlite"))) 101 + if err != nil { 102 + t.Fatal(err) 103 + } 104 + maindb.AutoMigrate(models.ActorInfo{}) 105 + 106 + did := "did:plc:beepboop" 107 + maindb.Create(&models.ActorInfo{ 108 + Did: did, 109 + Uid: 1, 110 + }) 111 + 112 + cs := testCarstore(t, dir) 113 + 114 + repoman := NewRepoManager(maindb, cs, &util.FakeKeyManager{}) 115 + 116 + dir2, err := os.MkdirTemp("", "integtest") 117 + if err != nil { 118 + t.Fatal(err) 119 + } 120 + cs2 := testCarstore(t, dir2) 121 + 122 + ctx := context.TODO() 123 + var prev *cid.Cid 124 + for i := 0; i < 5; i++ { 125 + slice, head := doPost(t, cs2, did, prev, i) 126 + 127 + if err := repoman.HandleExternalUserEvent(ctx, 1, 1, did, prev, slice); err != nil { 128 + t.Fatal(err) 129 + } 130 + 131 + prev = &head 132 + } 133 + 134 + latest := *prev 135 + 136 + // now do a few outside of the standard event stream flow 137 + for i := 0; i < 5; i++ { 138 + _, head := doPost(t, cs2, did, prev, i) 139 + prev = &head 140 + } 141 + 142 + buf := new(bytes.Buffer) 143 + if err := cs2.ReadUserCar(ctx, 1, latest, *prev, true, buf); err != nil { 144 + t.Fatal(err) 145 + } 146 + 147 + if err := repoman.ImportNewRepo(ctx, 1, did, buf, latest); err != nil { 148 + t.Fatal(err) 149 + } 150 + } 151 + 152 + func doPost(t *testing.T, cs *carstore.CarStore, did string, prev *cid.Cid, postid int) ([]byte, cid.Cid) { 153 + ctx := context.TODO() 154 + ds, err := cs.NewDeltaSession(ctx, 1, prev) 155 + if err != nil { 156 + t.Fatal(err) 157 + } 158 + 159 + r := repo.NewRepo(ctx, did, ds) 160 + 161 + if _, _, err := r.CreateRecord(ctx, "app.bsky.feed.post", &bsky.FeedPost{ 162 + Text: fmt.Sprintf("hello friend %d", postid), 163 + }); err != nil { 164 + t.Fatal(err) 165 + } 166 + 167 + root, err := r.Commit(ctx, func(context.Context, string, []byte) ([]byte, error) { return nil, nil }) 168 + if err != nil { 169 + t.Fatal(err) 170 + } 171 + 172 + slice, err := ds.CloseWithRoot(ctx, root) 173 + if err != nil { 174 + t.Fatal(err) 175 + } 176 + 177 + return slice, root 178 + }
+18 -4
repomgr/repomgr.go
··· 839 839 840 840 var ops []RepoOp 841 841 for _, op := range diffops { 842 - 843 842 out, err := processOp(ctx, bs, op) 844 843 if err != nil { 845 844 log.Errorw("failed to process repo op", "err", err, "path", op.Rpath) ··· 881 880 return nil 882 881 }) 883 882 if err != nil { 884 - return fmt.Errorf("process new repo: %w:", err) 883 + return fmt.Errorf("process new repo (current head: %s): %w:", head, err) 885 884 } 886 885 887 886 return nil ··· 991 990 seen[until] = true 992 991 } 993 992 993 + var baseBs blockstore.Blockstore 994 + if until.Defined() { 995 + bs, err := rm.cs.ReadOnlySession(user) 996 + if err != nil { 997 + return err 998 + } 999 + 1000 + baseBs = bs 1001 + } 1002 + 994 1003 prev := until 995 1004 for i := len(commits) - 1; i >= 0; i-- { 996 1005 root := commits[i] ··· 1023 1032 return ds.CloseWithRoot(ctx, root) 1024 1033 } 1025 1034 1026 - if err := cb(ctx, prev, root, finish, membs); err != nil { 1027 - return fmt.Errorf("cb errored (%d/%d): %w", i, len(commits)-1, err) 1035 + cbs := membs 1036 + if i == len(commits)-1 { 1037 + cbs = util.NewReadThroughBstore(baseBs, membs) 1038 + } 1039 + 1040 + if err := cb(ctx, prev, root, finish, cbs); err != nil { 1041 + return fmt.Errorf("cb errored (%d/%d) root: %s, prev: %s: %w", i, len(commits)-1, root, prev, err) 1028 1042 } 1029 1043 1030 1044 prev = root