this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

review tweaks

authored by

Brian Olson and committed by
Brian Olson
502cf77b 98159ad5

+18 -25
+18 -25
bgs/compactor.go
··· 2 2 3 3 import ( 4 4 "context" 5 - crypto_rand "crypto/rand" 6 5 "fmt" 7 6 "math/rand/v2" 8 7 "sync" ··· 24 23 q []queueItem 25 24 members map[models.Uid]struct{} 26 25 lk sync.Mutex 27 - rnd *rand.Rand 28 26 } 29 27 30 28 // Append appends a uid to the end of the queue if it doesn't already exist ··· 116 114 item = q.q[0] 117 115 q.q = nil 118 116 } else { 119 - pos := q.rnd.IntN(len(q.q)) 117 + pos := rand.IntN(len(q.q)) 120 118 item = q.q[pos] 121 119 last := len(q.q) - 1 122 120 q.q[pos] = q.q[last] ··· 172 170 RequeueFast: true, 173 171 NumWorkers: 2, 174 172 } 175 - } 176 - 177 - func newRandFromRoot() *rand.Rand { 178 - var seed [32]byte 179 - crypto_rand.Read(seed[:]) 180 - chacha := rand.NewChaCha8(seed) 181 - return rand.New(chacha) 182 173 } 183 174 184 175 func NewCompactor(opts *CompactorOptions) *Compactor { ··· 189 180 return &Compactor{ 190 181 q: &uniQueue{ 191 182 members: make(map[models.Uid]struct{}), 192 - rnd: newRandFromRoot(), 193 183 }, 194 184 exit: make(chan struct{}), 195 185 requeueInterval: opts.RequeueInterval, ··· 301 291 NextRandom 302 292 ) 303 293 304 - func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStrategy) (state CompactorState, err error) { 294 + func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStrategy) (CompactorState, error) { 305 295 ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext") 306 296 defer span.End() 307 297 ··· 314 304 item, ok = c.q.Pop() 315 305 } 316 306 if !ok { 317 - err = errNoReposToCompact 318 - return 307 + return CompactorState{}, errNoReposToCompact 319 308 } 320 309 321 - state.set(item.uid, "unknown", "getting_user", nil) 310 + state := CompactorState{ 311 + latestUID: item.uid, 312 + latestDID: "unknown", 313 + status: "getting_user", 314 + } 322 315 323 316 user, err := bgs.lookupUserByUID(ctx, item.uid) 324 317 if err != nil { 325 318 span.RecordError(err) 326 - state.set(item.uid, "unknown", "failed_getting_user", nil) 327 - err = fmt.Errorf("failed to get user %d: %w", item.uid, err) 328 - return 319 + state.status = "failed_getting_user" 320 + err := fmt.Errorf("failed to get user %d: %w", item.uid, err) 321 + return state, err 329 322 } 330 323 331 324 span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid))) 332 325 333 - state.set(item.uid, user.Did, "compacting", nil) 326 + state.latestDID = user.Did 334 327 335 328 start := time.Now() 336 329 st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast) 337 330 if err != nil { 338 331 span.RecordError(err) 339 - state.set(item.uid, user.Did, "failed_compacting", nil) 340 - err = fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err) 341 - return 332 + state.status = "failed_compacting" 333 + err := fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err) 334 + return state, err 342 335 } 343 336 compactionDuration.Observe(time.Since(start).Seconds()) 344 337 ··· 350 343 attribute.Int("refs", st.TotalRefs), 351 344 ) 352 345 353 - state.set(item.uid, user.Did, "done", st) 346 + state.status = "done" 347 + state.stats = st 354 348 355 - err = nil 356 - return 349 + return state, nil 357 350 } 358 351 359 352 func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {