this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Skip events that are already accounted for when backfilling (#509)

authored by

Whyrusleeping and committed by
GitHub
e711d4a7 33491ca7

+14 -1
+4 -1
backfill/backfill.go
··· 96 96 // ErrEventGap is returned when an event is received with a since that doesn't match the current rev 97 97 var ErrEventGap = fmt.Errorf("buffered event revs did not line up") 98 98 99 + // ErrAlreadyProcessed is returned when attempting to buffer an event that has already been accounted for (rev older than current) 100 + var ErrAlreadyProcessed = fmt.Errorf("event already accounted for") 101 + 99 102 var tracer = otel.Tracer("backfiller") 100 103 101 104 type BackfillOptions struct { ··· 500 503 } 501 504 502 505 buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops) 503 - if err != nil { 506 + if err != nil && !errors.Is(err, ErrAlreadyProcessed) { 504 507 return fmt.Errorf("buffer ops failed: %w", err) 505 508 } 506 509
+10
backfill/gormstore.go
··· 154 154 return false, fmt.Errorf("invalid job state: %q", j.state) 155 155 } 156 156 157 + if j.rev >= rev { 158 + // we've already accounted for this event 159 + return false, ErrAlreadyProcessed 160 + } 161 + 157 162 j.bufferOps(&opSet{since: since, rev: rev, ops: ops}) 158 163 return true, nil 159 164 } ··· 317 322 if opset.since == nil { 318 323 // TODO: what does this mean? 319 324 return fmt.Errorf("nil since in event after backfill: %w", ErrEventGap) 325 + } 326 + 327 + if j.rev > *opset.since { 328 + // we've already accounted for this event 329 + continue 320 330 } 321 331 322 332 if j.rev != *opset.since {