this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement streaming repo parser

authored by

whyrusleeping and committed by
whyrusleeping
12f6c03f 3ce38b31

+175 -18
+9 -14
cmd/gosky/main.go
··· 635 635 636 636 var repob []byte 637 637 if strings.HasPrefix(arg, "did:") { 638 - xrpcc, err := cliutil.GetXrpcClient(cctx, true) 638 + resp, err := identity.DefaultDirectory().LookupDID(ctx, syntax.DID(arg)) 639 639 if err != nil { 640 640 return err 641 + } 642 + 643 + xrpcc := &xrpc.Client{ 644 + Host: resp.PDSEndpoint(), 641 645 } 642 646 643 647 if arg == "" { ··· 649 653 return err 650 654 } 651 655 repob = rrb 656 + fmt.Println("GOT REPO BYTES") 652 657 } else { 653 658 if len(arg) == 0 { 654 659 return cli.Exit("must specify DID string or repo path", 127) ··· 659 664 } 660 665 661 666 repob = fb 662 - } 663 - 664 - rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repob)) 665 - if err != nil { 666 - return err 667 667 } 668 668 669 669 collection := "app.bsky.feed.post" ··· 673 673 vals := cctx.Bool("values") 674 674 cids := cctx.Bool("cids") 675 675 676 - if err := rr.ForEach(ctx, collection, func(k string, v cid.Cid) error { 676 + if err := repo.StreamRepoRecords(ctx, bytes.NewReader(repob), collection, func(k string, cc cid.Cid, v []byte) error { 677 677 if !strings.HasPrefix(k, collection) { 678 678 return repo.ErrDoneIterating 679 679 } 680 680 681 681 fmt.Print(k) 682 682 if cids { 683 - fmt.Println(" - ", v) 683 + fmt.Println(" - ", cc) 684 684 } else { 685 685 fmt.Println() 686 686 } 687 687 if vals { 688 - b, err := rr.Blockstore().Get(ctx, v) 689 - if err != nil { 690 - return err 691 - } 692 - 693 - convb, err := cborToJson(b.RawData()) 688 + convb, err := cborToJson(v) 694 689 if err != nil { 695 690 return err 696 691 }
+4 -4
cmd/gosky/sync.go
··· 9 9 "github.com/bluesky-social/indigo/atproto/identity" 10 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 11 "github.com/bluesky-social/indigo/util/cliutil" 12 + "github.com/bluesky-social/indigo/xrpc" 12 13 13 14 cli "github.com/urfave/cli/v2" 14 15 ) ··· 53 54 carPath = ident.DID.String() + ".car" 54 55 } 55 56 56 - xrpcc, err := cliutil.GetXrpcClient(cctx, false) 57 - if err != nil { 58 - return err 57 + xrpcc := &xrpc.Client{ 58 + Host: ident.PDSEndpoint(), 59 59 } 60 - xrpcc.Host = ident.PDSEndpoint() 60 + 61 61 if xrpcc.Host == "" { 62 62 return fmt.Errorf("no PDS endpoint for identity") 63 63 }
+162
repo/stream.go
··· 1 + package repo 2 + 3 + import ( 4 + "bufio" 5 + "context" 6 + "fmt" 7 + "io" 8 + "log/slog" 9 + "sync" 10 + 11 + "github.com/bluesky-social/indigo/mst" 12 + "github.com/bluesky-social/indigo/repo/carutil" 13 + "github.com/bluesky-social/indigo/util" 14 + block "github.com/ipfs/go-block-format" 15 + cid "github.com/ipfs/go-cid" 16 + "go.opentelemetry.io/otel" 17 + ) 18 + 19 + type waitingBlockstore struct { 20 + lk sync.Mutex 21 + blockWaits map[cid.Cid]chan block.Block 22 + otherBlocks map[cid.Cid]block.Block 23 + streamComplete bool 24 + } 25 + 26 + func newWaitingBlockstore() *waitingBlockstore { 27 + return &waitingBlockstore{ 28 + blockWaits: make(map[cid.Cid]chan block.Block), 29 + otherBlocks: make(map[cid.Cid]block.Block), 30 + } 31 + } 32 + 33 + func (bs *waitingBlockstore) Get(ctx context.Context, cc cid.Cid) (block.Block, error) { 34 + bs.lk.Lock() 35 + 36 + if blk, ok := bs.otherBlocks[cc]; ok { 37 + delete(bs.otherBlocks, cc) 38 + bs.lk.Unlock() 39 + return blk, nil 40 + } 41 + 42 + if bs.streamComplete { 43 + bs.lk.Unlock() 44 + return nil, ErrMissingBlock 45 + } 46 + 47 + bw, ok := bs.blockWaits[cc] 48 + if ok { 49 + bs.lk.Unlock() 50 + return nil, fmt.Errorf("somehow already have active wait for block in question: %s", cc) 51 + } 52 + 53 + bw = make(chan block.Block, 1) 54 + 55 + bs.blockWaits[cc] = bw 56 + 57 + bs.lk.Unlock() 58 + 59 + select { 60 + case blk, ok := <-bw: 61 + if !ok { 62 + return nil, ErrMissingBlock 63 + } 64 + 65 + return blk, nil 66 + case <-ctx.Done(): 67 + return nil, ctx.Err() 68 + 69 + } 70 + } 71 + 72 + var ErrMissingBlock = fmt.Errorf("block was missing from archive") 73 + 74 + func (bs *waitingBlockstore) Put(ctx context.Context, blk block.Block) error { 75 + bs.lk.Lock() 76 + defer bs.lk.Unlock() 77 + 78 + bw, ok := bs.blockWaits[blk.Cid()] 79 + if ok { 80 + bw <- blk 81 + delete(bs.blockWaits, blk.Cid()) 82 + return nil 83 + } 84 + 85 + bs.otherBlocks[blk.Cid()] = blk.(*block.BasicBlock) 86 + return nil 87 + } 88 + 89 + func (bs *waitingBlockstore) Complete() { 90 + bs.lk.Lock() 91 + defer bs.lk.Unlock() 92 + bs.streamComplete = true 93 + for _, ch := range bs.blockWaits { 94 + close(ch) 95 + } 96 + } 97 + 98 + func StreamRepoRecords(ctx context.Context, r io.Reader, prefix string, cb func(k string, c cid.Cid, v []byte) error) error { 99 + ctx, span := otel.Tracer("repo").Start(ctx, "RepoStream") 100 + defer span.End() 101 + 102 + br, root, err := carutil.NewReader(bufio.NewReader(r)) 103 + if err != nil { 104 + return fmt.Errorf("opening CAR block reader: %w", err) 105 + } 106 + 107 + bs := newWaitingBlockstore() 108 + cst := util.CborStore(bs) 109 + 110 + var wg sync.WaitGroup 111 + wg.Add(1) 112 + var walkErr error 113 + go func() { 114 + defer wg.Done() 115 + 116 + var sc SignedCommit 117 + if err := cst.Get(ctx, root, &sc); err != nil { 118 + walkErr = fmt.Errorf("loading root from blockstore: %w", err) 119 + return 120 + } 121 + 122 + if sc.Version != ATP_REPO_VERSION && sc.Version != ATP_REPO_VERSION_2 { 123 + walkErr = fmt.Errorf("unsupported repo version: %d", sc.Version) 124 + return 125 + } 126 + // TODO: verify that signature 127 + 128 + t := mst.LoadMST(cst, sc.Data) 129 + 130 + if err := t.WalkLeavesFrom(ctx, prefix, func(k string, val cid.Cid) error { 131 + blk, err := bs.Get(ctx, val) 132 + if err != nil { 133 + slog.Error("failed to get record from tree", "key", k, "cid", val, "error", err) 134 + return nil 135 + } 136 + 137 + return cb(k, val, blk.RawData()) 138 + }); err != nil { 139 + walkErr = fmt.Errorf("failed to walk mst: %w", err) 140 + } 141 + }() 142 + 143 + for { 144 + blk, err := br.NextBlock(repoBlockBufferPool, repoBlockBufferSize) 145 + if err != nil { 146 + if err == io.EOF { 147 + break 148 + } 149 + return fmt.Errorf("reading block from CAR: %w", err) 150 + } 151 + 152 + if err := bs.Put(ctx, blk); err != nil { 153 + return fmt.Errorf("copying block to store: %w", err) 154 + } 155 + } 156 + 157 + bs.Complete() 158 + 159 + wg.Wait() 160 + 161 + return walkErr 162 + }