this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add a car reader implementation that uses a buffer pool to improve allocations

+116 -4
+102
repo/carutil/reader.go
··· 1 + package carutil 2 + 3 + import ( 4 + "bufio" 5 + "encoding/binary" 6 + "errors" 7 + "fmt" 8 + "io" 9 + "sync" 10 + 11 + blocks "github.com/ipfs/go-block-format" 12 + "github.com/ipfs/go-cid" 13 + car "github.com/ipld/go-car" 14 + ) 15 + 16 + type Reader struct { 17 + r *bufio.Reader 18 + 19 + bufs [][]byte 20 + } 21 + 22 + func NewReader(r *bufio.Reader) (*Reader, cid.Cid, error) { 23 + h, err := car.ReadHeader(r) 24 + if err != nil { 25 + return nil, cid.Undef, err 26 + } 27 + 28 + if h.Version != 1 { 29 + return nil, cid.Undef, fmt.Errorf("invalid version: %d", h.Version) 30 + } 31 + 32 + if len(h.Roots) != 1 { 33 + return nil, cid.Undef, fmt.Errorf("expected only 1 root in car file") 34 + } 35 + 36 + return &Reader{ 37 + r: r, 38 + bufs: make([][]byte, 0, 10), 39 + }, h.Roots[0], nil 40 + } 41 + 42 + func (r *Reader) Free(alloc *sync.Pool) { 43 + for _, b := range r.bufs { 44 + alloc.Put(b) 45 + } 46 + r.bufs = nil 47 + } 48 + 49 + const MaxAllowedSectionSize = 32 << 20 50 + 51 + func (r *Reader) NextBlock(allocator *sync.Pool, allocMax uint64) (blocks.Block, error) { 52 + data, err := ldRead(r.r, allocator, allocMax) 53 + if err != nil { 54 + return nil, err 55 + } 56 + 57 + r.bufs = append(r.bufs, data) 58 + 59 + n, c, err := cid.CidFromBytes(data) 60 + if err != nil { 61 + return nil, err 62 + } 63 + 64 + return blocks.NewBlockWithCid(data[n:], c) 65 + } 66 + 67 + func ldRead(r *bufio.Reader, alloc *sync.Pool, allocMax uint64) ([]byte, error) { 68 + if _, err := r.Peek(1); err != nil { // no more blocks, likely clean io.EOF 69 + return nil, err 70 + } 71 + 72 + l, err := binary.ReadUvarint(r) 73 + if err != nil { 74 + if err == io.EOF { 75 + return nil, io.ErrUnexpectedEOF // don't silently pretend this is a clean EOF 76 + } 77 + return nil, err 78 + } 79 + 80 + if l > uint64(MaxAllowedSectionSize) { // Don't OOM 81 + return nil, errors.New("malformed car; header is bigger than util.MaxAllowedSectionSize") 82 + } 83 + 84 + if l > allocMax { 85 + // direct allocation, not great 86 + buf := make([]byte, l) 87 + if _, err := io.ReadFull(r, buf); err != nil { 88 + return nil, err 89 + } 90 + 91 + return buf, nil 92 + } 93 + 94 + buf := alloc.Get().([]byte) 95 + buf = buf[:l] 96 + 97 + if _, err := io.ReadFull(r, buf); err != nil { 98 + return nil, err 99 + } 100 + 101 + return buf, nil 102 + }
+14 -4
repo/repo.go
··· 1 1 package repo 2 2 3 3 import ( 4 + "bufio" 4 5 "bytes" 5 6 "context" 6 7 "fmt" 7 8 "io" 9 + "sync" 8 10 9 11 "github.com/bluesky-social/indigo/atproto/repo" 10 12 lexutil "github.com/bluesky-social/indigo/lex/util" 11 13 "github.com/bluesky-social/indigo/mst" 14 + "github.com/bluesky-social/indigo/repo/carutil" 12 15 "github.com/bluesky-social/indigo/util" 13 16 "github.com/ipfs/go-cid" 14 17 cbor "github.com/ipfs/go-ipld-cbor" 15 - "github.com/ipld/go-car/v2" 16 18 cbg "github.com/whyrusleeping/cbor-gen" 17 19 "go.opentelemetry.io/otel" 18 20 ) ··· 73 75 return buf.Bytes(), nil 74 76 } 75 77 78 + const repoBlockBufferSize = 128 << 10 79 + 80 + var repoBlockBufferPool = &sync.Pool{ 81 + New: func() any { 82 + return make([]byte, repoBlockBufferSize) 83 + }, 84 + } 85 + 76 86 func IngestRepo(ctx context.Context, bs cbor.IpldBlockstore, r io.Reader) (cid.Cid, error) { 77 87 ctx, span := otel.Tracer("repo").Start(ctx, "Ingest") 78 88 defer span.End() 79 89 80 - br, err := car.NewBlockReader(r) 90 + br, root, err := carutil.NewReader(bufio.NewReader(r)) 81 91 if err != nil { 82 92 return cid.Undef, fmt.Errorf("opening CAR block reader: %w", err) 83 93 } 84 94 85 95 for { 86 - blk, err := br.Next() 96 + blk, err := br.NextBlock(repoBlockBufferPool, repoBlockBufferSize) 87 97 if err != nil { 88 98 if err == io.EOF { 89 99 break ··· 96 106 } 97 107 } 98 108 99 - return br.Roots[0], nil 109 + return root, nil 100 110 } 101 111 102 112 func ReadRepoFromCar(ctx context.Context, r io.Reader) (*Repo, error) {