this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Better backfill flow (#525)

authored by

Jaz and committed by
GitHub
c029c5cf c0816814

+34 -53
+34 -53
backfill/backfill.go
··· 7 7 "fmt" 8 8 "log/slog" 9 9 "net/http" 10 + "strings" 10 11 "sync" 11 12 "time" 12 13 ··· 16 17 "github.com/ipfs/go-cid" 17 18 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" 18 19 "go.opentelemetry.io/otel" 20 + "golang.org/x/sync/semaphore" 19 21 "golang.org/x/time/rate" 20 22 ) 21 23 ··· 149 151 log := slog.With("source", "backfiller", "name", b.Name) 150 152 log.Info("starting backfill processor") 151 153 152 - sem := make(chan struct{}, b.ParallelBackfills) 154 + sem := semaphore.NewWeighted(int64(b.ParallelBackfills)) 153 155 154 156 for { 155 157 select { ··· 171 173 continue 172 174 } 173 175 176 + log := log.With("repo", job.Repo()) 177 + 174 178 // Mark the backfill as "in progress" 175 179 err = job.SetState(ctx, StateInProgress) 176 180 if err != nil { ··· 178 182 continue 179 183 } 180 184 181 - sem <- struct{}{} 185 + sem.Acquire(ctx, 1) 182 186 go func(j Job) { 183 - b.BackfillRepo(ctx, j) 187 + defer sem.Release(1) 188 + newState, err := b.BackfillRepo(ctx, j) 189 + if err != nil { 190 + log.Error("failed to backfill repo", "error", err) 191 + } 192 + if newState != "" { 193 + if sserr := j.SetState(ctx, newState); sserr != nil { 194 + log.Error("failed to set job state", "error", sserr) 195 + } 196 + 197 + if strings.HasPrefix(newState, "failed") { 198 + // Clear buffered ops 199 + if err := j.ClearBufferedOps(ctx); err != nil { 200 + log.Error("failed to clear buffered ops", "error", err) 201 + } 202 + } 203 + } 184 204 backfillJobsProcessed.WithLabelValues(b.Name).Inc() 185 - <-sem 186 205 }(job) 187 206 } 188 207 } ··· 262 281 } 263 282 264 283 // BackfillRepo backfills a repo 265 - func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) { 284 + func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) { 266 285 ctx, span := tracer.Start(ctx, "BackfillRepo") 267 286 defer span.End() 268 287 ··· 290 309 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 291 310 if err != nil { 292 311 state := fmt.Sprintf("failed (create request: %s)", err.Error()) 293 - // Mark the job as "failed" 294 - err := job.SetState(ctx, state) 295 - if err != nil { 296 - log.Error("failed to set job state", "error", err) 297 - } 298 - 299 - log.Error("failed to create request", "error", err) 300 - return 312 + return state, fmt.Errorf("failed to create request: %w", err) 301 313 } 302 314 303 315 req.Header.Set("Accept", "application/vnd.ipld.car") ··· 311 323 resp, err := client.Do(req) 312 324 if err != nil { 313 325 state := fmt.Sprintf("failed (do request: %s)", err.Error()) 314 - // Mark the job as "failed" 315 - err := job.SetState(ctx, state) 316 - if err != nil { 317 - log.Error("failed to set job state", "error", err) 318 - } 319 - 320 - log.Error("failed to send request", "error", err) 321 - return 326 + return state, fmt.Errorf("failed to send request: %w", err) 322 327 } 323 328 324 329 if resp.StatusCode != http.StatusOK { 325 - log.Info("failed to get repo", "status", resp.StatusCode) 326 330 reason := "unknown error" 327 331 if resp.StatusCode == http.StatusBadRequest { 328 332 reason = "repo not found" 333 + } else { 334 + reason = resp.Status 329 335 } 330 336 state := fmt.Sprintf("failed (%s)", reason) 331 - 332 - // Mark the job as "failed" 333 - err := job.SetState(ctx, state) 334 - if err != nil { 335 - log.Error("failed to set job state", "error", err) 336 - } 337 - 338 - // Clear buffered ops 339 - err = job.ClearBufferedOps(ctx) 340 - if err != nil { 341 - log.Error("failed to clear buffered ops", "error", err) 342 - } 343 - return 337 + return state, fmt.Errorf("failed to get repo: %s", reason) 344 338 } 345 339 346 340 instrumentedReader := instrumentedReader{ ··· 352 346 353 347 r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) 354 348 if err != nil { 355 - log.Error("failed to read repo from car", "error", err) 356 - 357 349 state := "failed (couldn't read repo CAR from response body)" 358 - 359 - // Mark the job as "failed" 360 - err := job.SetState(ctx, state) 361 - if err != nil { 362 - log.Error("failed to set job state", "error", err) 363 - } 364 - 365 - // Clear buffered ops 366 - err = job.ClearBufferedOps(ctx) 367 - if err != nil { 368 - log.Error("failed to clear buffered ops", "error", err) 369 - } 370 - return 350 + return state, fmt.Errorf("failed to read repo from car: %w", err) 371 351 } 372 352 373 353 numRecords := 0 ··· 375 355 recordQueue := make(chan recordQueueItem, numRoutines) 376 356 recordResults := make(chan recordResult, numRoutines) 377 357 378 - wg := sync.WaitGroup{} 379 - 380 358 // Producer routine 381 359 go func() { 382 360 defer close(recordQueue) ··· 385 363 recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid} 386 364 return nil 387 365 }); err != nil { 388 - log.Error("failed to iterated records in repo", "err", err) 366 + log.Error("failed to iterate records in repo", "err", err) 389 367 } 390 368 }() 391 369 392 370 rev := r.SignedCommit().Rev 393 371 394 372 // Consumer routines 373 + wg := sync.WaitGroup{} 395 374 for i := 0; i < numRoutines; i++ { 396 375 wg.Add(1) 397 376 go func() { ··· 445 424 "records_backfilled", numRecords, 446 425 "duration", time.Since(start), 447 426 ) 427 + 428 + return StateComplete, nil 448 429 } 449 430 450 431 const trust = true