this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

try a pool

authored by

whyrusleeping and committed by
whyrusleeping
a5f57b26 5cfe465b

+64 -17
+32 -12
repo/carutil/reader.go
··· 37 37 const MaxAllowedSectionSize = 32 << 20 38 38 39 39 func (r *Reader) NextBlock() (*BasicBlock, error) { 40 - data, err := ldRead(r.r) 40 + blk, _, err := r.NextBlockBuf(nil) 41 41 if err != nil { 42 42 return nil, err 43 + } 44 + 45 + return blk, nil 46 + } 47 + 48 + func (r *Reader) NextBlockBuf(buf []byte) (*BasicBlock, bool, error) { 49 + data, usedBuf, err := ldRead(r.r, buf) 50 + if err != nil { 51 + return nil, false, err 43 52 } 44 53 45 54 n, c, err := cid.CidFromBytes(data) 46 55 if err != nil { 47 - return nil, err 56 + return nil, false, err 48 57 } 49 58 50 - return NewBlockWithCid(data[n:], data, c), nil 59 + return NewBlockWithCid(data[n:], data, c), usedBuf, nil 51 60 } 52 61 53 - func ldRead(r *bufio.Reader) ([]byte, error) { 62 + // reads a length delimited value off of the given reader into the the given buf if its big enough, otherwise allocates a new buffer. 63 + // returns whether or not the passed in buffer was used 64 + func ldRead(r *bufio.Reader, buf []byte) ([]byte, bool, error) { 54 65 if _, err := r.Peek(1); err != nil { // no more blocks, likely clean io.EOF 55 - return nil, err 66 + return nil, false, err 56 67 } 57 68 58 69 l, err := binary.ReadUvarint(r) 59 70 if err != nil { 60 71 if err == io.EOF { 61 - return nil, io.ErrUnexpectedEOF // don't silently pretend this is a clean EOF 72 + return nil, false, io.ErrUnexpectedEOF // don't silently pretend this is a clean EOF 62 73 } 63 - return nil, err 74 + return nil, false, err 64 75 } 65 76 66 77 if l > uint64(MaxAllowedSectionSize) { // Don't OOM 67 - return nil, errors.New("malformed car; header is bigger than util.MaxAllowedSectionSize") 78 + return nil, false, errors.New("malformed car; header is bigger than util.MaxAllowedSectionSize") 68 79 } 69 80 70 - // direct allocation, not great 71 - buf := make([]byte, l) 81 + if l > uint64(len(buf)) { 82 + // direct allocation, not great 83 + buf := make([]byte, l) 84 + if _, err := io.ReadFull(r, buf); err != nil { 85 + return nil, false, err 86 + } 87 + 88 + return buf, false, nil 89 + } 90 + 91 + buf = buf[:l] 72 92 if _, err := io.ReadFull(r, buf); err != nil { 73 - return nil, err 93 + return nil, false, err 74 94 } 75 95 76 - return buf, nil 96 + return buf, true, nil 77 97 } 78 98 79 99 type BasicBlock struct {
+32 -5
repo/stream.go
··· 6 6 "fmt" 7 7 "io" 8 8 "log/slog" 9 + "sync" 9 10 10 11 "github.com/bluesky-social/indigo/mst" 11 12 "github.com/bluesky-social/indigo/repo/carutil" ··· 15 16 "go.opentelemetry.io/otel" 16 17 ) 17 18 19 + const bufPoolBlockSize = 512 20 + 21 + var smallBlockPool = &sync.Pool{ 22 + New: func() any { 23 + return make([]byte, bufPoolBlockSize) 24 + }, 25 + } 26 + 18 27 type readStreamBlockstore struct { 19 28 otherBlocks map[cid.Cid]*carutil.BasicBlock 20 29 streamComplete bool 21 30 22 31 r *carutil.Reader 32 + 33 + outOfOrder int 34 + totalBlocks int 23 35 } 24 36 25 37 func newStreamingBlockstore(r *carutil.Reader) *readStreamBlockstore { 26 38 return &readStreamBlockstore{ 27 - otherBlocks: make(map[cid.Cid]*carutil.BasicBlock), 39 + otherBlocks: make(map[cid.Cid]*carutil.BasicBlock, 20), 28 40 r: r, 29 41 } 30 42 } 31 43 32 44 func (bs *readStreamBlockstore) readUntilBlock(ctx context.Context, cc cid.Cid) (*carutil.BasicBlock, error) { 33 45 for { 34 - blk, err := bs.r.NextBlock() 46 + buf := smallBlockPool.Get().([]byte) 47 + blk, used, err := bs.r.NextBlockBuf(buf) 35 48 if err != nil { 49 + smallBlockPool.Put(buf) 36 50 if err == io.EOF { 37 51 break 38 52 } 39 53 return nil, fmt.Errorf("reading block from CAR: %w", err) 40 54 } 41 55 56 + if !used { 57 + smallBlockPool.Put(buf) 58 + } 59 + 60 + bs.totalBlocks++ 42 61 if blk.Cid() == cc { 43 62 return blk, nil 44 63 } 45 64 65 + bs.outOfOrder++ 46 66 bs.otherBlocks[blk.Cid()] = blk 47 67 } 48 68 ··· 81 101 82 102 if err := cb(blk.RawData()); err != nil { 83 103 return err 104 + } 105 + 106 + if len(blk.BaseBuffer()) == bufPoolBlockSize { 107 + smallBlockPool.Put(blk.BaseBuffer()) 84 108 } 85 109 86 110 return nil ··· 113 137 if sc.Version != ATP_REPO_VERSION && sc.Version != ATP_REPO_VERSION_2 { 114 138 return "", fmt.Errorf("unsupported repo version: %d", sc.Version) 115 139 } 140 + 116 141 // TODO: verify that signature 117 142 118 143 t := mst.LoadMST(cst, sc.Data) 119 144 120 145 if err := t.WalkLeavesFrom(ctx, prefix, func(k string, val cid.Cid) error { 121 - blk, err := bs.Get(ctx, val) 122 - if err != nil { 146 + if err := bs.View(val, func(data []byte) error { 147 + return cb(k, val, data) 148 + }); err != nil { 123 149 slog.Error("failed to get record from tree", "key", k, "cid", val, "error", err) 124 150 return nil 125 151 } 126 152 127 - return cb(k, val, blk.RawData()) 153 + return nil 128 154 }); err != nil { 129 155 return "", fmt.Errorf("failed to walk mst: %w", err) 130 156 } 131 157 158 + fmt.Println("out of order blocks: ", bs.outOfOrder, bs.totalBlocks) 132 159 return sc.Rev, nil 133 160 }