this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

handle rev better

authored by

whyrusleeping and committed by
whyrusleeping
dc2c8da7 404f5ce7

+14 -9
+6 -3
backfill/backfill.go
··· 425 425 recordResults := make(chan recordResult, numRoutines) 426 426 427 427 var rev string 428 + // guaranteed to be called before any items are send on the recordQueue channel 429 + setRev := func(s string) { 430 + rev = s 431 + } 432 + 428 433 // Producer routine 429 434 go func() { 430 435 defer close(recordQueue) 431 - rrev, err := repo.StreamRepoRecords(ctx, r, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid, data []byte) error { 436 + err := repo.StreamRepoRecords(ctx, r, b.NSIDFilter, setRev, func(recordPath string, nodeCid cid.Cid, data []byte) error { 432 437 numRecords++ 433 438 recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid, data: data} 434 439 return nil ··· 436 441 if err != nil { 437 442 log.Error("failed to iterate records in repo", "err", err) 438 443 } 439 - 440 - rev = rrev 441 444 }() 442 445 443 446 // Consumer routines
+8 -6
repo/stream.go
··· 111 111 return fmt.Errorf("put is not needed") 112 112 } 113 113 114 - func StreamRepoRecords(ctx context.Context, r io.Reader, prefix string, cb func(k string, c cid.Cid, v []byte) error) (string, error) { 114 + func StreamRepoRecords(ctx context.Context, r io.Reader, prefix string, setRev func(string), cb func(k string, c cid.Cid, v []byte) error) error { 115 115 ctx, span := otel.Tracer("repo").Start(ctx, "RepoStream") 116 116 defer span.End() 117 117 118 118 br, root, err := carutil.NewReader(bufio.NewReader(r)) 119 119 if err != nil { 120 - return "", fmt.Errorf("opening CAR block reader: %w", err) 120 + return fmt.Errorf("opening CAR block reader: %w", err) 121 121 } 122 122 123 123 bs := newStreamingBlockstore(br) ··· 126 126 127 127 var sc SignedCommit 128 128 if err := cst.Get(ctx, root, &sc); err != nil { 129 - return "", fmt.Errorf("loading root from blockstore: %w", err) 129 + return fmt.Errorf("loading root from blockstore: %w", err) 130 130 } 131 131 132 132 if sc.Version != ATP_REPO_VERSION && sc.Version != ATP_REPO_VERSION_2 { 133 - return "", fmt.Errorf("unsupported repo version: %d", sc.Version) 133 + return fmt.Errorf("unsupported repo version: %d", sc.Version) 134 134 } 135 135 136 136 // TODO: verify that signature 137 + 138 + setRev(sc.Rev) 137 139 138 140 t := mst.LoadMST(cst, sc.Data) 139 141 ··· 147 149 148 150 return nil 149 151 }); err != nil { 150 - return "", fmt.Errorf("failed to walk mst: %w", err) 152 + return fmt.Errorf("failed to walk mst: %w", err) 151 153 } 152 154 153 - return sc.Rev, nil 155 + return nil 154 156 }