this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

in the case of missing blocks, fall back to full repo fetch (#332)

If for some reason we end up with missing data, add a multi-stage
fallback to first fetch the missing revision range ( in the case of an
event slice erroring) and then if data is still missing, fall back to
fetching the entire repo.

also added a debug command to fetch and verify data in a repo.

authored by

Whyrusleeping and committed by
GitHub
976cb2bc 6442e521

+65 -2
+2 -1
bgs/bgs.go
··· 31 31 32 32 "github.com/gorilla/websocket" 33 33 "github.com/ipfs/go-cid" 34 + ipld "github.com/ipfs/go-ipld-format" 34 35 logging "github.com/ipfs/go-log" 35 36 "github.com/labstack/echo/v4" 36 37 "github.com/labstack/echo/v4/middleware" ··· 765 766 if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil { 766 767 log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String()) 767 768 768 - if errors.Is(err, carstore.ErrRepoBaseMismatch) { 769 + if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) { 769 770 ai, err := bgs.Index.LookupUser(ctx, u.ID) 770 771 if err != nil { 771 772 return err
+3 -1
carstore/bs.go
··· 258 258 seq int 259 259 readonly bool 260 260 cs *CarStore 261 + lastRev string 261 262 } 262 263 263 264 func (cs *CarStore) checkLastShardCache(user models.Uid) *CarShard { ··· 332 333 baseCid: lastShard.Root.CID, 333 334 cs: cs, 334 335 seq: lastShard.Seq + 1, 336 + lastRev: lastShard.Rev, 335 337 }, nil 336 338 } 337 339 ··· 880 882 881 883 rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks) 882 884 if err != nil { 883 - return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s): %w", ds.baseCid, err) 885 + return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s,since=%v,rev=%s): %w", ds.baseCid, since, ds.lastRev, err) 884 886 } 885 887 886 888 ds.rmcids = rmcids
+43
cmd/gosky/debug.go
··· 41 41 debugFeedGenCmd, 42 42 debugFeedViewCmd, 43 43 compareStreamsCmd, 44 + debugGetRepoCmd, 44 45 }, 45 46 } 46 47 ··· 746 747 747 748 return nil 748 749 } 750 + 751 + var debugGetRepoCmd = &cli.Command{ 752 + Name: "get-repo", 753 + Flags: []cli.Flag{}, 754 + ArgsUsage: `<did>`, 755 + Action: func(cctx *cli.Context) error { 756 + xrpcc, err := cliutil.GetXrpcClient(cctx, false) 757 + if err != nil { 758 + return err 759 + } 760 + 761 + ctx := context.TODO() 762 + 763 + repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "") 764 + if err != nil { 765 + return fmt.Errorf("getting repo: %w", err) 766 + } 767 + 768 + rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repobytes)) 769 + if err != nil { 770 + return err 771 + } 772 + 773 + fmt.Println("Rev: ", rep.SignedCommit().Rev) 774 + var count int 775 + if err := rep.ForEach(ctx, "", func(k string, v cid.Cid) error { 776 + rec, err := rep.Blockstore().Get(ctx, v) 777 + if err != nil { 778 + return err 779 + } 780 + 781 + count++ 782 + _ = rec 783 + return nil 784 + }); err != nil { 785 + return err 786 + } 787 + fmt.Printf("scanned %d records\n", count) 788 + 789 + return nil 790 + }, 791 + }
+17
indexer/indexer.go
··· 21 21 "golang.org/x/time/rate" 22 22 23 23 "github.com/ipfs/go-cid" 24 + ipld "github.com/ipfs/go-ipld-format" 24 25 logging "github.com/ipfs/go-log" 25 26 "go.opentelemetry.io/otel" 26 27 "go.opentelemetry.io/otel/attribute" ··· 464 465 // we probably want alternative ways of doing this for 'very large' or 'very old' repos, but this works for now 465 466 if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), &rev); err != nil { 466 467 span.RecordError(err) 468 + 469 + if ipld.IsNotFound(err) { 470 + limiter.Wait(ctx) 471 + log.Infow("SyncGetRepo", "did", ai.Did, "user", ai.Handle, "since", "", "fallback", true) 472 + repo, err := comatproto.SyncGetRepo(ctx, c, ai.Did, "") 473 + if err != nil { 474 + reposFetched.WithLabelValues("fail").Inc() 475 + return fmt.Errorf("failed to fetch repo: %w", err) 476 + } 477 + reposFetched.WithLabelValues("success").Inc() 478 + 479 + if err := ix.repomgr.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), nil); err != nil { 480 + span.RecordError(err) 481 + return fmt.Errorf("failed to import backup repo (%s): %w", ai.Did, err) 482 + } 483 + } 467 484 return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err) 468 485 } 469 486