this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge remote-tracking branch 'origin/feat/splitter-rebase' into feat/splitter-rebase

+8 -24
+4 -4
events/events.go
··· 362 362 case <-done: 363 363 return ErrPlaybackShutdown 364 364 case out <- e: 365 - seq := sequenceForEvent(e) 365 + seq := SequenceForEvent(e) 366 366 if seq > 0 { 367 367 lastSeq = seq 368 368 } ··· 387 387 388 388 // run playback again to get us to the events that have started buffering 389 389 if err := em.persister.Playback(ctx, lastSeq, func(e *XRPCStreamEvent) error { 390 - seq := sequenceForEvent(e) 391 - if seq > sequenceForEvent(first) { 390 + seq := SequenceForEvent(e) 391 + if seq > SequenceForEvent(first) { 392 392 return ErrCaughtUp 393 393 } 394 394 ··· 423 423 return out, sub.cleanup, nil 424 424 } 425 425 426 - func sequenceForEvent(evt *XRPCStreamEvent) int64 { 426 + func SequenceForEvent(evt *XRPCStreamEvent) int64 { 427 427 return evt.Sequence() 428 428 } 429 429
+2 -2
splitter/ringbuf.go
··· 96 96 for ; i >= 0; i-- { 97 97 c := chunks[i] 98 98 evts := c.events() 99 - if since > sequenceForEvent(evts[len(evts)-1]) { 99 + if since > events.SequenceForEvent(evts[len(evts)-1]) { 100 100 i++ 101 101 break 102 102 } ··· 112 112 for nread < len(evts) { 113 113 for _, e := range evts[nread:] { 114 114 nread++ 115 - seq := sequenceForEvent(e) 115 + seq := events.SequenceForEvent(e) 116 116 if seq <= since { 117 117 continue 118 118 }
+2 -18
splitter/splitter.go
··· 399 399 defer cancel() 400 400 401 401 sched := sequential.NewScheduler("splitter", func(ctx context.Context, evt *events.XRPCStreamEvent) error { 402 - seq := sequenceForEvent(evt) 402 + seq := events.SequenceForEvent(evt) 403 403 if seq < 0 { 404 404 // ignore info events and other unsupported types 405 405 return nil ··· 418 418 *lastCursor = seq 419 419 return nil 420 420 }) 421 - return events.HandleRepoStream(ctx, con, sched) 422 - } 423 421 424 - func sequenceForEvent(evt *events.XRPCStreamEvent) int64 { 425 - switch { 426 - case evt.RepoCommit != nil: 427 - return evt.RepoCommit.Seq 428 - case evt.RepoHandle != nil: 429 - return evt.RepoHandle.Seq 430 - case evt.RepoMigrate != nil: 431 - return evt.RepoMigrate.Seq 432 - case evt.RepoTombstone != nil: 433 - return evt.RepoTombstone.Seq 434 - case evt.RepoInfo != nil: 435 - return -1 436 - default: 437 - return -1 438 - } 422 + return events.HandleRepoStream(ctx, con, sched) 439 423 } 440 424 441 425 func (s *Splitter) getLastCursor() (int64, error) {