this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

okay, its a generator now

authored by

whyrusleeping and committed by
whyrusleeping
fcfd5d67 12f6c03f

+57 -88
+57 -88
repo/stream.go
··· 6 6 "fmt" 7 7 "io" 8 8 "log/slog" 9 - "sync" 10 9 11 10 "github.com/bluesky-social/indigo/mst" 12 11 "github.com/bluesky-social/indigo/repo/carutil" ··· 17 16 ) 18 17 19 18 type waitingBlockstore struct { 20 - lk sync.Mutex 21 - blockWaits map[cid.Cid]chan block.Block 22 19 otherBlocks map[cid.Cid]block.Block 23 20 streamComplete bool 21 + 22 + r *carutil.Reader 24 23 } 25 24 26 25 func newWaitingBlockstore() *waitingBlockstore { 27 26 return &waitingBlockstore{ 28 - blockWaits: make(map[cid.Cid]chan block.Block), 29 27 otherBlocks: make(map[cid.Cid]block.Block), 30 28 } 31 29 } 32 30 33 - func (bs *waitingBlockstore) Get(ctx context.Context, cc cid.Cid) (block.Block, error) { 34 - bs.lk.Lock() 31 + func (bs *waitingBlockstore) readUntilBlock(ctx context.Context, cc cid.Cid) (block.Block, error) { 32 + for { 33 + blk, err := bs.r.NextBlock(repoBlockBufferPool, repoBlockBufferSize) 34 + if err != nil { 35 + if err == io.EOF { 36 + break 37 + } 38 + return nil, fmt.Errorf("reading block from CAR: %w", err) 39 + } 40 + 41 + if blk.Cid() == cc { 42 + return blk, nil 43 + } 44 + 45 + bs.otherBlocks[blk.Cid()] = blk 46 + } 47 + 48 + bs.streamComplete = true 35 49 50 + return nil, io.EOF 51 + } 52 + 53 + func (bs *waitingBlockstore) Get(ctx context.Context, cc cid.Cid) (block.Block, error) { 36 54 if blk, ok := bs.otherBlocks[cc]; ok { 37 55 delete(bs.otherBlocks, cc) 38 - bs.lk.Unlock() 39 56 return blk, nil 40 57 } 41 58 42 59 if bs.streamComplete { 43 - bs.lk.Unlock() 44 60 return nil, ErrMissingBlock 45 61 } 46 62 47 - bw, ok := bs.blockWaits[cc] 48 - if ok { 49 - bs.lk.Unlock() 50 - return nil, fmt.Errorf("somehow already have active wait for block in question: %s", cc) 63 + blk, err := bs.readUntilBlock(ctx, cc) 64 + if err != nil { 65 + return nil, err 51 66 } 52 67 53 - bw = make(chan block.Block, 1) 54 - 55 - bs.blockWaits[cc] = bw 56 - 57 - bs.lk.Unlock() 58 - 59 - select { 60 - case blk, ok := <-bw: 61 - if !ok { 62 - return nil, ErrMissingBlock 63 - } 64 - 65 - return blk, nil 66 - case <-ctx.Done(): 67 - return nil, ctx.Err() 68 - 69 - } 68 + return blk, nil 70 69 } 71 70 72 71 var ErrMissingBlock = fmt.Errorf("block was missing from archive") 73 72 74 73 func (bs *waitingBlockstore) Put(ctx context.Context, blk block.Block) error { 75 - bs.lk.Lock() 76 - defer bs.lk.Unlock() 74 + return fmt.Errorf("put is not needed") 75 + /* 76 + bw, ok := bs.blockWaits[blk.Cid()] 77 + if ok { 78 + bw <- blk 79 + delete(bs.blockWaits, blk.Cid()) 80 + return nil 81 + } 77 82 78 - bw, ok := bs.blockWaits[blk.Cid()] 79 - if ok { 80 - bw <- blk 81 - delete(bs.blockWaits, blk.Cid()) 83 + bs.otherBlocks[blk.Cid()] = blk.(*block.BasicBlock) 82 84 return nil 83 - } 84 - 85 - bs.otherBlocks[blk.Cid()] = blk.(*block.BasicBlock) 86 - return nil 85 + */ 87 86 } 88 87 89 88 func (bs *waitingBlockstore) Complete() { 90 - bs.lk.Lock() 91 - defer bs.lk.Unlock() 92 - bs.streamComplete = true 93 - for _, ch := range bs.blockWaits { 94 - close(ch) 95 - } 96 89 } 97 90 98 91 func StreamRepoRecords(ctx context.Context, r io.Reader, prefix string, cb func(k string, c cid.Cid, v []byte) error) error { ··· 105 98 } 106 99 107 100 bs := newWaitingBlockstore() 108 - cst := util.CborStore(bs) 109 101 110 - var wg sync.WaitGroup 111 - wg.Add(1) 112 - var walkErr error 113 - go func() { 114 - defer wg.Done() 115 - 116 - var sc SignedCommit 117 - if err := cst.Get(ctx, root, &sc); err != nil { 118 - walkErr = fmt.Errorf("loading root from blockstore: %w", err) 119 - return 120 - } 102 + bs.r = br 121 103 122 - if sc.Version != ATP_REPO_VERSION && sc.Version != ATP_REPO_VERSION_2 { 123 - walkErr = fmt.Errorf("unsupported repo version: %d", sc.Version) 124 - return 125 - } 126 - // TODO: verify that signature 104 + cst := util.CborStore(bs) 127 105 128 - t := mst.LoadMST(cst, sc.Data) 106 + var sc SignedCommit 107 + if err := cst.Get(ctx, root, &sc); err != nil { 108 + return fmt.Errorf("loading root from blockstore: %w", err) 109 + } 129 110 130 - if err := t.WalkLeavesFrom(ctx, prefix, func(k string, val cid.Cid) error { 131 - blk, err := bs.Get(ctx, val) 132 - if err != nil { 133 - slog.Error("failed to get record from tree", "key", k, "cid", val, "error", err) 134 - return nil 135 - } 111 + if sc.Version != ATP_REPO_VERSION && sc.Version != ATP_REPO_VERSION_2 { 112 + return fmt.Errorf("unsupported repo version: %d", sc.Version) 113 + } 114 + // TODO: verify that signature 136 115 137 - return cb(k, val, blk.RawData()) 138 - }); err != nil { 139 - walkErr = fmt.Errorf("failed to walk mst: %w", err) 140 - } 141 - }() 116 + t := mst.LoadMST(cst, sc.Data) 142 117 143 - for { 144 - blk, err := br.NextBlock(repoBlockBufferPool, repoBlockBufferSize) 118 + if err := t.WalkLeavesFrom(ctx, prefix, func(k string, val cid.Cid) error { 119 + blk, err := bs.Get(ctx, val) 145 120 if err != nil { 146 - if err == io.EOF { 147 - break 148 - } 149 - return fmt.Errorf("reading block from CAR: %w", err) 121 + slog.Error("failed to get record from tree", "key", k, "cid", val, "error", err) 122 + return nil 150 123 } 151 124 152 - if err := bs.Put(ctx, blk); err != nil { 153 - return fmt.Errorf("copying block to store: %w", err) 154 - } 125 + return cb(k, val, blk.RawData()) 126 + }); err != nil { 127 + return fmt.Errorf("failed to walk mst: %w", err) 155 128 } 156 129 157 - bs.Complete() 158 - 159 - wg.Wait() 160 - 161 - return walkErr 130 + return nil 162 131 }