this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

manage That Memory

authored by

whyrusleeping and committed by
whyrusleeping
10c463d2 fcfd5d67

+30 -36
+1 -13
repo/carutil/reader.go
··· 15 15 16 16 type Reader struct { 17 17 r *bufio.Reader 18 - 19 - bufs [][]byte 20 18 } 21 19 22 20 func NewReader(r *bufio.Reader) (*Reader, cid.Cid, error) { ··· 34 32 } 35 33 36 34 return &Reader{ 37 - r: r, 38 - bufs: make([][]byte, 0, 10), 35 + r: r, 39 36 }, h.Roots[0], nil 40 37 } 41 38 42 - func (r *Reader) Free(alloc *sync.Pool) { 43 - for _, b := range r.bufs { 44 - alloc.Put(b) 45 - } 46 - r.bufs = nil 47 - } 48 - 49 39 const MaxAllowedSectionSize = 32 << 20 50 40 51 41 func (r *Reader) NextBlock(allocator *sync.Pool, allocMax uint64) (blocks.Block, error) { ··· 53 43 if err != nil { 54 44 return nil, err 55 45 } 56 - 57 - r.bufs = append(r.bufs, data) 58 46 59 47 n, c, err := cid.CidFromBytes(data) 60 48 if err != nil {
+12
repo/repo.go
··· 14 14 "github.com/bluesky-social/indigo/mst" 15 15 "github.com/bluesky-social/indigo/repo/carutil" 16 16 "github.com/bluesky-social/indigo/util" 17 + blocks "github.com/ipfs/go-block-format" 17 18 "github.com/ipfs/go-cid" 18 19 cbor "github.com/ipfs/go-ipld-cbor" 19 20 cbg "github.com/whyrusleeping/cbor-gen" ··· 84 85 New: func() any { 85 86 return make([]byte, repoBlockBufferSize) 86 87 }, 88 + } 89 + 90 + func freeRepoBlock(b blocks.Block) { 91 + bb, ok := b.(*blocks.BasicBlock) 92 + if !ok { 93 + return 94 + } 95 + 96 + if cap(bb.RawData()) == repoBlockBufferSize { 97 + repoBlockBufferPool.Put(bb.RawData()[:repoBlockBufferSize]) 98 + } 87 99 } 88 100 89 101 func IngestRepo(ctx context.Context, bs cbor.IpldBlockstore, r io.Reader) (cid.Cid, error) {
+17 -23
repo/stream.go
··· 15 15 "go.opentelemetry.io/otel" 16 16 ) 17 17 18 - type waitingBlockstore struct { 18 + type readStreamBlockstore struct { 19 19 otherBlocks map[cid.Cid]block.Block 20 20 streamComplete bool 21 21 22 22 r *carutil.Reader 23 + 24 + lastBlockRead block.Block 23 25 } 24 26 25 - func newWaitingBlockstore() *waitingBlockstore { 26 - return &waitingBlockstore{ 27 + func newStreamingBlockstore(r *carutil.Reader) *readStreamBlockstore { 28 + return &readStreamBlockstore{ 27 29 otherBlocks: make(map[cid.Cid]block.Block), 30 + r: r, 28 31 } 29 32 } 30 33 31 - func (bs *waitingBlockstore) readUntilBlock(ctx context.Context, cc cid.Cid) (block.Block, error) { 34 + func (bs *readStreamBlockstore) readUntilBlock(ctx context.Context, cc cid.Cid) (block.Block, error) { 32 35 for { 33 36 blk, err := bs.r.NextBlock(repoBlockBufferPool, repoBlockBufferSize) 34 37 if err != nil { ··· 50 53 return nil, io.EOF 51 54 } 52 55 53 - func (bs *waitingBlockstore) Get(ctx context.Context, cc cid.Cid) (block.Block, error) { 56 + func (bs *readStreamBlockstore) Get(ctx context.Context, cc cid.Cid) (block.Block, error) { 57 + if bs.lastBlockRead != nil { 58 + freeRepoBlock(bs.lastBlockRead) 59 + bs.lastBlockRead = nil 60 + } 61 + 54 62 if blk, ok := bs.otherBlocks[cc]; ok { 55 63 delete(bs.otherBlocks, cc) 64 + bs.lastBlockRead = blk 56 65 return blk, nil 57 66 } 58 67 ··· 65 74 return nil, err 66 75 } 67 76 77 + bs.lastBlockRead = blk 68 78 return blk, nil 69 79 } 70 80 71 81 var ErrMissingBlock = fmt.Errorf("block was missing from archive") 72 82 73 - func (bs *waitingBlockstore) Put(ctx context.Context, blk block.Block) error { 83 + func (bs *readStreamBlockstore) Put(ctx context.Context, blk block.Block) error { 74 84 return fmt.Errorf("put is not needed") 75 - /* 76 - bw, ok := bs.blockWaits[blk.Cid()] 77 - if ok { 78 - bw <- blk 79 - delete(bs.blockWaits, blk.Cid()) 80 - return nil 81 - } 82 - 83 - bs.otherBlocks[blk.Cid()] = blk.(*block.BasicBlock) 84 - return nil 85 - */ 86 - } 87 - 88 - func (bs *waitingBlockstore) Complete() { 89 85 } 90 86 91 87 func StreamRepoRecords(ctx context.Context, r io.Reader, prefix string, cb func(k string, c cid.Cid, v []byte) error) error { ··· 97 93 return fmt.Errorf("opening CAR block reader: %w", err) 98 94 } 99 95 100 - bs := newWaitingBlockstore() 101 - 102 - bs.r = br 96 + bs := newStreamingBlockstore(br) 103 97 104 98 cst := util.CborStore(bs) 105 99