this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix sequence output for playback

+40 -22
+35 -16
events/diskpersist.go
··· 60 60 61 61 const ( 62 62 EvtFlagTakedown = 1 << iota 63 + EvtFlagRebased 63 64 ) 64 65 65 66 var _ (EventPersistence) = (*DiskPersistence)(nil) ··· 221 222 return 0, err 222 223 } 223 224 224 - if end > 0 && eh.Seq >= end { 225 + if end > 0 && eh.Seq > end { 225 226 // return to beginning of offset 226 227 n, err := fi.Seek(offset, io.SeekStart) 227 228 if err != nil { ··· 493 494 } 494 495 495 496 func (p *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 497 + base := since - (since * p.eventsPerFile) 496 498 var logs []LogFileRef 497 - if err := p.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", since%p.eventsPerFile).Error; err != nil { 499 + if err := p.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", base).Error; err != nil { 498 500 return err 499 501 } 500 502 ··· 508 510 return nil 509 511 } 510 512 513 + func postDoNotEmit(flags uint32) bool { 514 + if flags&(EvtFlagRebased|EvtFlagTakedown) != 0 { 515 + return true 516 + } 517 + 518 + return false 519 + } 520 + 511 521 func (p *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*XRPCStreamEvent) error) error { 512 522 fi, err := os.OpenFile(fn, os.O_RDONLY, 0) 513 523 if err != nil { ··· 534 544 return err 535 545 } 536 546 537 - if h.Flags&EvtFlagTakedown != 0 { 547 + if postDoNotEmit(h.Flags) { 538 548 // event taken down, skip 539 549 _, err := io.CopyN(io.Discard, bufr, h.Len) // would be really nice if the buffered reader had a 'skip' method that does a seek under the hood 540 550 if err != nil { ··· 640 650 } 641 651 642 652 func (p *DiskPersistence) deleteEventsForUser(ctx context.Context, usr models.Uid, fn string) error { 653 + return p.mutateUserEventsInLog(ctx, usr, fn, EvtFlagTakedown, true) 654 + } 655 + 656 + func (p *DiskPersistence) mutateUserEventsInLog(ctx context.Context, usr models.Uid, fn string, flag uint32, zeroEvts bool) error { 643 657 fi, err := os.OpenFile(fn, os.O_RDWR, 0) 644 658 if err != nil { 645 659 return fmt.Errorf("failed to open log file: %w", err) 646 660 } 647 661 defer fi.Close() 662 + defer fi.Sync() 648 663 649 664 scratch := make([]byte, headerSize) 650 665 var offset int64 ··· 658 673 return err 659 674 } 660 675 661 - if h.Usr == usr && h.Flags&EvtFlagTakedown == 0 { 662 - nflag := h.Flags | EvtFlagTakedown 676 + if h.Usr == usr && h.Flags&flag == 0 { 677 + nflag := h.Flags | flag 663 678 664 679 binary.LittleEndian.PutUint32(scratch, nflag) 665 680 ··· 667 682 return fmt.Errorf("failed to write updated flag value: %w", err) 668 683 } 669 684 670 - // sync that write before blanking the event data 671 - if err := fi.Sync(); err != nil { 672 - return err 673 - } 685 + if zeroEvts { 686 + // sync that write before blanking the event data 687 + if err := fi.Sync(); err != nil { 688 + return err 689 + } 674 690 675 - if _, err := fi.Seek(offset+headerSize, io.SeekStart); err != nil { 676 - return fmt.Errorf("failed to seek: %w", err) 677 - } 691 + if _, err := fi.Seek(offset+headerSize, io.SeekStart); err != nil { 692 + return fmt.Errorf("failed to seek: %w", err) 693 + } 678 694 679 - _, err := io.CopyN(fi, &zeroReader{}, h.Len) 680 - if err != nil { 681 - return err 695 + _, err := io.CopyN(fi, &zeroReader{}, h.Len) 696 + if err != nil { 697 + return err 698 + } 682 699 } 683 700 } 684 701 ··· 691 708 } 692 709 693 710 func (p *DiskPersistence) RebaseRepoEvents(ctx context.Context, usr models.Uid) error { 694 - panic("todo") 711 + return p.forEachShardWithUserEvents(ctx, usr, func(ctx context.Context, fn string) error { 712 + return p.mutateUserEventsInLog(ctx, usr, fn, EvtFlagRebased, false) 713 + }) 695 714 } 696 715 697 716 func (p *DiskPersistence) Flush(ctx context.Context) error {
+5 -6
testing/utils.go
··· 429 429 430 430 notifman := notifs.NewNotificationManager(maindb, repoman.GetRecord) 431 431 432 - dbpersist, err := events.NewDbPersistence(maindb, cs, nil) 433 - if err != nil { 434 - return nil, err 435 - } 432 + opts := events.DefaultDiskPersistOptions() 433 + opts.EventsPerFile = 10 434 + diskpersist, err := events.NewDiskPersistence(filepath.Join(dir, "dp-primary"), filepath.Join(dir, "dp-archive"), maindb, opts) 436 435 437 - evtman := events.NewEventManager(dbpersist) 436 + evtman := events.NewEventManager(diskpersist) 438 437 439 438 ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, repoman, true, true) 440 439 if err != nil { ··· 527 526 go func() { 528 527 rsc := &events.RepoStreamCallbacks{ 529 528 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 530 - fmt.Println("received event: ", evt.Seq, evt.Repo) 531 529 es.Lk.Lock() 532 530 es.Events = append(es.Events, &events.XRPCStreamEvent{RepoCommit: evt}) 531 + fmt.Println("received event: ", evt.Seq, evt.Repo, len(es.Events)) 533 532 es.Lk.Unlock() 534 533 return nil 535 534 },