this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add a car reader implementation that uses a buffer pool to improve allocations

authored by

whyrusleeping and committed by
whyrusleeping
3ce38b31 3259b215

+116 -4
+102
repo/carutil/reader.go
··· 1 + package carutil 2 + 3 + import ( 4 + "bufio" 5 + "encoding/binary" 6 + "errors" 7 + "fmt" 8 + "io" 9 + "sync" 10 + 11 + blocks "github.com/ipfs/go-block-format" 12 + "github.com/ipfs/go-cid" 13 + car "github.com/ipld/go-car" 14 + ) 15 + 16 + type Reader struct { 17 + r *bufio.Reader 18 + 19 + bufs [][]byte 20 + } 21 + 22 + func NewReader(r *bufio.Reader) (*Reader, cid.Cid, error) { 23 + h, err := car.ReadHeader(r) 24 + if err != nil { 25 + return nil, cid.Undef, err 26 + } 27 + 28 + if h.Version != 1 { 29 + return nil, cid.Undef, fmt.Errorf("invalid version: %d", h.Version) 30 + } 31 + 32 + if len(h.Roots) != 1 { 33 + return nil, cid.Undef, fmt.Errorf("expected only 1 root in car file") 34 + } 35 + 36 + return &Reader{ 37 + r: r, 38 + bufs: make([][]byte, 0, 10), 39 + }, h.Roots[0], nil 40 + } 41 + 42 + func (r *Reader) Free(alloc *sync.Pool) { 43 + for _, b := range r.bufs { 44 + alloc.Put(b) 45 + } 46 + r.bufs = nil 47 + } 48 + 49 + const MaxAllowedSectionSize = 32 << 20 50 + 51 + func (r *Reader) NextBlock(allocator *sync.Pool, allocMax uint64) (blocks.Block, error) { 52 + data, err := ldRead(r.r, allocator, allocMax) 53 + if err != nil { 54 + return nil, err 55 + } 56 + 57 + r.bufs = append(r.bufs, data) 58 + 59 + n, c, err := cid.CidFromBytes(data) 60 + if err != nil { 61 + return nil, err 62 + } 63 + 64 + return blocks.NewBlockWithCid(data[n:], c) 65 + } 66 + 67 + func ldRead(r *bufio.Reader, alloc *sync.Pool, allocMax uint64) ([]byte, error) { 68 + if _, err := r.Peek(1); err != nil { // no more blocks, likely clean io.EOF 69 + return nil, err 70 + } 71 + 72 + l, err := binary.ReadUvarint(r) 73 + if err != nil { 74 + if err == io.EOF { 75 + return nil, io.ErrUnexpectedEOF // don't silently pretend this is a clean EOF 76 + } 77 + return nil, err 78 + } 79 + 80 + if l > uint64(MaxAllowedSectionSize) { // Don't OOM 81 + return nil, errors.New("malformed car; header is bigger than util.MaxAllowedSectionSize") 82 + } 83 + 84 + if l > allocMax { 85 + // direct allocation, not great 86 + buf := make([]byte, l) 87 + if _, err := io.ReadFull(r, buf); err != nil { 88 + return nil, err 89 + } 90 + 91 + return buf, nil 92 + } 93 + 94 + buf := alloc.Get().([]byte) 95 + buf = buf[:l] 96 + 97 + if _, err := io.ReadFull(r, buf); err != nil { 98 + return nil, err 99 + } 100 + 101 + return buf, nil 102 + }
+14 -4
repo/repo.go
··· 1 1 package repo 2 2 3 3 import ( 4 + "bufio" 4 5 "bytes" 5 6 "context" 6 7 "fmt" 7 8 "io" 9 + "sync" 8 10 9 11 "github.com/bluesky-social/indigo/atproto/repo" 10 12 "github.com/bluesky-social/indigo/atproto/syntax" 11 13 lexutil "github.com/bluesky-social/indigo/lex/util" 12 14 "github.com/bluesky-social/indigo/mst" 15 + "github.com/bluesky-social/indigo/repo/carutil" 13 16 "github.com/bluesky-social/indigo/util" 14 17 "github.com/ipfs/go-cid" 15 18 cbor "github.com/ipfs/go-ipld-cbor" 16 - "github.com/ipld/go-car" 17 19 cbg "github.com/whyrusleeping/cbor-gen" 18 20 "go.opentelemetry.io/otel" 19 21 ) ··· 76 78 return buf.Bytes(), nil 77 79 } 78 80 81 + const repoBlockBufferSize = 128 << 10 82 + 83 + var repoBlockBufferPool = &sync.Pool{ 84 + New: func() any { 85 + return make([]byte, repoBlockBufferSize) 86 + }, 87 + } 88 + 79 89 func IngestRepo(ctx context.Context, bs cbor.IpldBlockstore, r io.Reader) (cid.Cid, error) { 80 90 ctx, span := otel.Tracer("repo").Start(ctx, "Ingest") 81 91 defer span.End() 82 92 83 - br, err := car.NewCarReader(r) 93 + br, root, err := carutil.NewReader(bufio.NewReader(r)) 84 94 if err != nil { 85 95 return cid.Undef, fmt.Errorf("opening CAR block reader: %w", err) 86 96 } 87 97 88 98 for { 89 - blk, err := br.Next() 99 + blk, err := br.NextBlock(repoBlockBufferPool, repoBlockBufferSize) 90 100 if err != nil { 91 101 if err == io.EOF { 92 102 break ··· 99 109 } 100 110 } 101 111 102 - return br.Header.Roots[0], nil 112 + return root, nil 103 113 } 104 114 105 115 func ReadRepoFromCar(ctx context.Context, r io.Reader) (*Repo, error) {