this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement view blockstore to have better memory management

authored by

whyrusleeping and committed by
whyrusleeping
06023f76 10c463d2

+40 -48
+16 -23
backfill/backfill.go
··· 5 5 "context" 6 6 "errors" 7 7 "fmt" 8 + "io" 8 9 "log/slog" 9 10 "net/http" 10 11 "strings" ··· 301 302 type recordQueueItem struct { 302 303 recordPath string 303 304 nodeCid cid.Cid 305 + data []byte 304 306 } 305 307 306 308 type recordResult struct { ··· 323 325 return fmt.Sprintf("failed to get repo: %s (%d)", reason, e.StatusCode) 324 326 } 325 327 326 - // Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo 327 - func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*repo.Repo, error) { 328 + // Fetches a repo CAR file over HTTP from the indicated host. 329 + func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (io.ReadCloser, error) { 328 330 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did) 329 331 330 332 if since != "" { ··· 366 368 counter: backfillBytesProcessed.WithLabelValues(b.Name), 367 369 } 368 370 369 - defer instrumentedReader.Close() 370 - 371 - repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader) 372 - if err != nil { 373 - return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err) 374 - } 375 - return repo, nil 371 + return &instrumentedReader, nil 376 372 } 377 373 378 374 // BackfillRepo backfills a repo ··· 390 386 } 391 387 log.Info(fmt.Sprintf("processing backfill for %s", repoDID)) 392 388 393 - var r *repo.Repo 389 + var r io.ReadCloser 394 390 if b.tryRelayRepoFetch { 395 391 rr, err := b.fetchRepo(ctx, repoDID, job.Rev(), b.RelayHost) 396 392 if err != nil { ··· 426 422 recordQueue := make(chan recordQueueItem, numRoutines) 427 423 recordResults := make(chan recordResult, numRoutines) 428 424 425 + var rev string 429 426 // Producer routine 430 427 go func() { 431 428 defer close(recordQueue) 432 - if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error { 429 + rrev, err := repo.StreamRepoRecords(ctx, r, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid, data []byte) error { 433 430 numRecords++ 434 - recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid} 431 + recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid, data: data} 435 432 return nil 436 - }); err != nil { 433 + }) 434 + if err != nil { 437 435 log.Error("failed to iterate records in repo", "err", err) 438 436 } 437 + 438 + rev = rrev 439 439 }() 440 - 441 - rev := r.SignedCommit().Rev 442 440 443 441 // Consumer routines 444 442 wg := sync.WaitGroup{} ··· 447 445 go func() { 448 446 defer wg.Done() 449 447 for item := range recordQueue { 450 - blk, err := r.Blockstore().Get(ctx, item.nodeCid) 451 - if err != nil { 452 - recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)} 453 - continue 454 - } 455 448 456 - raw := blk.RawData() 449 + raw := item.data 457 450 458 - err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid) 451 + err := b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid) 459 452 if err != nil { 460 453 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} 461 454 continue ··· 483 476 close(recordResults) 484 477 resultWG.Wait() 485 478 486 - if err := job.SetRev(ctx, r.SignedCommit().Rev); err != nil { 479 + if err := job.SetRev(ctx, rev); err != nil { 487 480 log.Error("failed to update rev after backfilling repo", "err", err) 488 481 } 489 482
+1 -1
cmd/gosky/main.go
··· 673 673 vals := cctx.Bool("values") 674 674 cids := cctx.Bool("cids") 675 675 676 - if err := repo.StreamRepoRecords(ctx, bytes.NewReader(repob), collection, func(k string, cc cid.Cid, v []byte) error { 676 + if _, err := repo.StreamRepoRecords(ctx, bytes.NewReader(repob), collection, func(k string, cc cid.Cid, v []byte) error { 677 677 if !strings.HasPrefix(k, collection) { 678 678 return repo.ErrDoneIterating 679 679 }
+3 -9
repo/repo.go
··· 14 14 "github.com/bluesky-social/indigo/mst" 15 15 "github.com/bluesky-social/indigo/repo/carutil" 16 16 "github.com/bluesky-social/indigo/util" 17 - blocks "github.com/ipfs/go-block-format" 18 17 "github.com/ipfs/go-cid" 19 18 cbor "github.com/ipfs/go-ipld-cbor" 20 19 cbg "github.com/whyrusleeping/cbor-gen" ··· 87 86 }, 88 87 } 89 88 90 - func freeRepoBlock(b blocks.Block) { 91 - bb, ok := b.(*blocks.BasicBlock) 92 - if !ok { 93 - return 94 - } 95 - 96 - if cap(bb.RawData()) == repoBlockBufferSize { 97 - repoBlockBufferPool.Put(bb.RawData()[:repoBlockBufferSize]) 89 + func FreeRepoBlock(b []byte) { 90 + if cap(b) == repoBlockBufferSize { 91 + repoBlockBufferPool.Put(b[:repoBlockBufferSize]) 98 92 } 99 93 } 100 94
+20 -15
repo/stream.go
··· 20 20 streamComplete bool 21 21 22 22 r *carutil.Reader 23 - 24 - lastBlockRead block.Block 25 23 } 26 24 27 25 func newStreamingBlockstore(r *carutil.Reader) *readStreamBlockstore { ··· 54 52 } 55 53 56 54 func (bs *readStreamBlockstore) Get(ctx context.Context, cc cid.Cid) (block.Block, error) { 57 - if bs.lastBlockRead != nil { 58 - freeRepoBlock(bs.lastBlockRead) 59 - bs.lastBlockRead = nil 60 - } 61 - 62 55 if blk, ok := bs.otherBlocks[cc]; ok { 63 56 delete(bs.otherBlocks, cc) 64 - bs.lastBlockRead = blk 65 57 return blk, nil 66 58 } 67 59 ··· 74 66 return nil, err 75 67 } 76 68 77 - bs.lastBlockRead = blk 78 69 return blk, nil 79 70 } 80 71 72 + func (bs *readStreamBlockstore) View(cc cid.Cid, cb func([]byte) error) error { 73 + blk, err := bs.Get(context.TODO(), cc) 74 + if err != nil { 75 + return err 76 + } 77 + 78 + if err := cb(blk.RawData()); err != nil { 79 + return err 80 + } 81 + 82 + FreeRepoBlock(blk.RawData()) 83 + return nil 84 + } 85 + 81 86 var ErrMissingBlock = fmt.Errorf("block was missing from archive") 82 87 83 88 func (bs *readStreamBlockstore) Put(ctx context.Context, blk block.Block) error { 84 89 return fmt.Errorf("put is not needed") 85 90 } 86 91 87 - func StreamRepoRecords(ctx context.Context, r io.Reader, prefix string, cb func(k string, c cid.Cid, v []byte) error) error { 92 + func StreamRepoRecords(ctx context.Context, r io.Reader, prefix string, cb func(k string, c cid.Cid, v []byte) error) (string, error) { 88 93 ctx, span := otel.Tracer("repo").Start(ctx, "RepoStream") 89 94 defer span.End() 90 95 91 96 br, root, err := carutil.NewReader(bufio.NewReader(r)) 92 97 if err != nil { 93 - return fmt.Errorf("opening CAR block reader: %w", err) 98 + return "", fmt.Errorf("opening CAR block reader: %w", err) 94 99 } 95 100 96 101 bs := newStreamingBlockstore(br) ··· 99 104 100 105 var sc SignedCommit 101 106 if err := cst.Get(ctx, root, &sc); err != nil { 102 - return fmt.Errorf("loading root from blockstore: %w", err) 107 + return "", fmt.Errorf("loading root from blockstore: %w", err) 103 108 } 104 109 105 110 if sc.Version != ATP_REPO_VERSION && sc.Version != ATP_REPO_VERSION_2 { 106 - return fmt.Errorf("unsupported repo version: %d", sc.Version) 111 + return "", fmt.Errorf("unsupported repo version: %d", sc.Version) 107 112 } 108 113 // TODO: verify that signature 109 114 ··· 118 123 119 124 return cb(k, val, blk.RawData()) 120 125 }); err != nil { 121 - return fmt.Errorf("failed to walk mst: %w", err) 126 + return "", fmt.Errorf("failed to walk mst: %w", err) 122 127 } 123 128 124 - return nil 129 + return sc.Rev, nil 125 130 }