this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

maybe the pool was a bad idea, but the rest of this is fine

authored by

whyrusleeping and committed by
whyrusleeping
5947ca0e e13676ef

+7 -35
+5 -17
repo/carutil/reader.go
··· 6 6 "errors" 7 7 "fmt" 8 8 "io" 9 - "sync" 10 9 11 10 "github.com/ipfs/go-cid" 12 11 car "github.com/ipld/go-car" ··· 37 36 38 37 const MaxAllowedSectionSize = 32 << 20 39 38 40 - func (r *Reader) NextBlock(allocator *sync.Pool, allocMax uint64) (*BasicBlock, error) { 41 - data, err := ldRead(r.r, allocator, allocMax) 39 + func (r *Reader) NextBlock() (*BasicBlock, error) { 40 + data, err := ldRead(r.r) 42 41 if err != nil { 43 42 return nil, err 44 43 } ··· 51 50 return NewBlockWithCid(data[n:], data, c), nil 52 51 } 53 52 54 - func ldRead(r *bufio.Reader, alloc *sync.Pool, allocMax uint64) ([]byte, error) { 53 + func ldRead(r *bufio.Reader) ([]byte, error) { 55 54 if _, err := r.Peek(1); err != nil { // no more blocks, likely clean io.EOF 56 55 return nil, err 57 56 } ··· 68 67 return nil, errors.New("malformed car; header is bigger than util.MaxAllowedSectionSize") 69 68 } 70 69 71 - if l > allocMax { 72 - // direct allocation, not great 73 - buf := make([]byte, l) 74 - if _, err := io.ReadFull(r, buf); err != nil { 75 - return nil, err 76 - } 77 - 78 - return buf, nil 79 - } 80 - 81 - buf := alloc.Get().([]byte) 82 - buf = buf[:l] 83 - 70 + // direct allocation, not great 71 + buf := make([]byte, l) 84 72 if _, err := io.ReadFull(r, buf); err != nil { 85 73 return nil, err 86 74 }
+1 -16
repo/repo.go
··· 6 6 "context" 7 7 "fmt" 8 8 "io" 9 - "sync" 10 9 11 10 "github.com/bluesky-social/indigo/atproto/repo" 12 11 "github.com/bluesky-social/indigo/atproto/syntax" ··· 78 77 return buf.Bytes(), nil 79 78 } 80 79 81 - const repoBlockBufferSize = 128 << 10 82 - 83 - var repoBlockBufferPool = &sync.Pool{ 84 - New: func() any { 85 - return make([]byte, repoBlockBufferSize) 86 - }, 87 - } 88 - 89 - func FreeRepoBlock(b []byte) { 90 - if cap(b) == repoBlockBufferSize { 91 - repoBlockBufferPool.Put(b[:repoBlockBufferSize]) 92 - } 93 - } 94 - 95 80 func IngestRepo(ctx context.Context, bs cbor.IpldBlockstore, r io.Reader) (cid.Cid, error) { 96 81 ctx, span := otel.Tracer("repo").Start(ctx, "Ingest") 97 82 defer span.End() ··· 102 87 } 103 88 104 89 for { 105 - blk, err := br.NextBlock(repoBlockBufferPool, repoBlockBufferSize) 90 + blk, err := br.NextBlock() 106 91 if err != nil { 107 92 if err == io.EOF { 108 93 break
+1 -2
repo/stream.go
··· 31 31 32 32 func (bs *readStreamBlockstore) readUntilBlock(ctx context.Context, cc cid.Cid) (*carutil.BasicBlock, error) { 33 33 for { 34 - blk, err := bs.r.NextBlock(repoBlockBufferPool, repoBlockBufferSize) 34 + blk, err := bs.r.NextBlock() 35 35 if err != nil { 36 36 if err == io.EOF { 37 37 break ··· 83 83 return err 84 84 } 85 85 86 - FreeRepoBlock(blk.BaseBuffer()) 87 86 return nil 88 87 } 89 88