this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor out models.Uid

+50 -63
-5
cmd/relayered/models/models.go
··· 1 - package models 2 - 3 - import () 4 - 5 - type Uid uint64
+5 -6
cmd/relayered/relay/account.go
··· 10 10 11 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 - "github.com/bluesky-social/indigo/cmd/relayered/models" 14 13 "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 15 14 "github.com/bluesky-social/indigo/xrpc" 16 15 ··· 24 23 ErrCommitNoUser = errors.New("commit no user") // TODO 25 24 ) 26 25 27 - func (r *Relay) DidToUid(ctx context.Context, did string) (models.Uid, error) { 26 + func (r *Relay) DidToUid(ctx context.Context, did string) (uint64, error) { 28 27 xu, err := r.LookupUserByDid(ctx, did) 29 28 if err != nil { 30 29 return 0, err ··· 58 57 return &u, nil 59 58 } 60 59 61 - func (r *Relay) LookupUserByUID(ctx context.Context, uid models.Uid) (*slurper.Account, error) { 60 + func (r *Relay) LookupUserByUID(ctx context.Context, uid uint64) (*slurper.Account, error) { 62 61 ctx, span := tracer.Start(ctx, "lookupUserByUID") 63 62 defer span.End() 64 63 ··· 293 292 return nil 294 293 } 295 294 296 - func (r *Relay) GetAccountPreviousState(ctx context.Context, uid models.Uid) (*slurper.AccountPreviousState, error) { 295 + func (r *Relay) GetAccountPreviousState(ctx context.Context, uid uint64) (*slurper.AccountPreviousState, error) { 297 296 var prevState slurper.AccountPreviousState 298 297 if err := r.db.First(&prevState, uid).Error; err != nil { 299 298 if errors.Is(err, gorm.ErrRecordNotFound) { ··· 305 304 return &prevState, nil 306 305 } 307 306 308 - func (r *Relay) GetRepoRoot(ctx context.Context, user models.Uid) (cid.Cid, error) { 307 + func (r *Relay) GetRepoRoot(ctx context.Context, uid uint64) (cid.Cid, error) { 309 308 var prevState slurper.AccountPreviousState 310 - err := r.db.First(&prevState, user).Error 309 + err := r.db.First(&prevState, uid).Error 311 310 if err == nil { 312 311 return prevState.Cid.CID, nil 313 312 } else if errors.Is(err, gorm.ErrRecordNotFound) {
+2 -3
cmd/relayered/relay/firehose.go
··· 9 9 10 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 - "github.com/bluesky-social/indigo/cmd/relayered/models" 13 12 "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 14 13 "github.com/bluesky-social/indigo/cmd/relayered/stream" 15 14 ··· 318 317 return nil 319 318 } 320 319 321 - func (r *Relay) upsertPrevState(accountID models.Uid, newRootCid *cid.Cid, rev string, seq int64) error { 320 + func (r *Relay) upsertPrevState(uid uint64, newRootCid *cid.Cid, rev string, seq int64) error { 322 321 cidBytes := newRootCid.Bytes() 323 322 return r.db.Exec( 324 323 "INSERT INTO account_previous_states (uid, cid, rev, seq) VALUES (?, ?, ?, ?) ON CONFLICT (uid) DO UPDATE SET cid = EXCLUDED.cid, rev = EXCLUDED.rev, seq = EXCLUDED.seq", 325 - accountID, cidBytes, rev, seq, 324 + uid, cidBytes, rev, seq, 326 325 ).Error 327 326 } 328 327
+6 -7
cmd/relayered/relay/slurper/models.go
··· 5 5 "time" 6 6 7 7 "github.com/bluesky-social/indigo/atproto/syntax" 8 - "github.com/bluesky-social/indigo/cmd/relayered/models" 9 8 10 9 "github.com/ipfs/go-cid" 11 10 "gorm.io/gorm" ··· 17 16 } 18 17 19 18 type AccountPreviousState struct { 20 - Uid models.Uid `gorm:"column:uid;primaryKey"` 21 - Cid DbCID `gorm:"column:cid"` 22 - Rev string `gorm:"column:rev"` 23 - Seq int64 `gorm:"column:seq"` 19 + Uid uint64 `gorm:"column:uid;primaryKey"` 20 + Cid DbCID `gorm:"column:cid"` 21 + Rev string `gorm:"column:rev"` 22 + Seq int64 `gorm:"column:seq"` 24 23 } 25 24 26 25 func (ups *AccountPreviousState) GetCid() cid.Cid { ··· 50 49 } 51 50 52 51 type Account struct { 53 - ID models.Uid `gorm:"primarykey"` 52 + ID uint64 `gorm:"primarykey"` 54 53 CreatedAt time.Time 55 54 UpdatedAt time.Time 56 55 DeletedAt gorm.DeletedAt `gorm:"index"` ··· 73 72 return account.Did 74 73 } 75 74 76 - func (account *Account) GetUid() models.Uid { 75 + func (account *Account) GetUid() uint64 { 77 76 return account.ID 78 77 } 79 78
+7 -8
cmd/relayered/relay/validator/validator.go
··· 13 13 "github.com/bluesky-social/indigo/atproto/identity" 14 14 "github.com/bluesky-social/indigo/atproto/repo" 15 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/bluesky-social/indigo/cmd/relayered/models" 17 16 "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 18 17 19 18 "github.com/ipfs/go-cid" ··· 27 26 ErrRevTooFarFuture := fmt.Errorf("new rev is > %s in the future", maxRevFuture) 28 27 29 28 return &Validator{ 30 - userLocks: make(map[models.Uid]*userLock), 29 + userLocks: make(map[uint64]*userLock), 31 30 log: slog.Default().With("system", "validator"), 32 31 directory: directory, 33 32 ··· 40 39 // Validator contains the context and code necessary to validate #commit and #sync messages 41 40 type Validator struct { 42 41 lklk sync.Mutex 43 - userLocks map[models.Uid]*userLock 42 + userLocks map[uint64]*userLock 44 43 45 44 log *slog.Logger 46 45 ··· 59 58 } 60 59 61 60 type NextCommitHandler interface { 62 - HandleCommit(ctx context.Context, host *slurper.PDS, uid models.Uid, did string, commit *comatproto.SyncSubscribeRepos_Commit) error 61 + HandleCommit(ctx context.Context, host *slurper.PDS, uid uint64, did string, commit *comatproto.SyncSubscribeRepos_Commit) error 63 62 } 64 63 65 64 type userLock struct { ··· 68 67 } 69 68 70 69 // lockUser re-serializes access per-user after events may have been fanned out to many worker threads by events/schedulers/parallel 71 - func (val *Validator) lockUser(ctx context.Context, user models.Uid) func() { 70 + func (val *Validator) lockUser(ctx context.Context, uid uint64) func() { 72 71 ctx, span := otel.Tracer("validator").Start(ctx, "userLock") 73 72 defer span.End() 74 73 75 74 val.lklk.Lock() 76 75 77 - ulk, ok := val.userLocks[user] 76 + ulk, ok := val.userLocks[uid] 78 77 if !ok { 79 78 ulk = &userLock{} 80 - val.userLocks[user] = ulk 79 + val.userLocks[uid] = ulk 81 80 } 82 81 83 82 ulk.waiters.Add(1) ··· 95 94 nv := ulk.waiters.Add(-1) 96 95 97 96 if nv == 0 { 98 - delete(val.userLocks, user) 97 + delete(val.userLocks, uid) 99 98 } 100 99 } 101 100 }
+2 -3
cmd/relayered/stream/eventmgr/event_manager.go
··· 7 7 "sync" 8 8 "time" 9 9 10 - "github.com/bluesky-social/indigo/cmd/relayered/models" 11 10 "github.com/bluesky-social/indigo/cmd/relayered/stream" 12 11 "github.com/bluesky-social/indigo/cmd/relayered/stream/persist" 13 12 ··· 236 235 em.subs = append(em.subs, sub) 237 236 } 238 237 239 - func (em *EventManager) TakeDownRepo(ctx context.Context, user models.Uid) error { 240 - return em.persister.TakeDownRepo(ctx, user) 238 + func (em *EventManager) TakeDownRepo(ctx context.Context, uid uint64) error { 239 + return em.persister.TakeDownRepo(ctx, uid) 241 240 } 242 241 243 242 // TODO: remove this?
+4 -5
cmd/relayered/stream/events.go
··· 8 8 "log/slog" 9 9 10 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 - "github.com/bluesky-social/indigo/cmd/relayered/models" 12 11 lexutil "github.com/bluesky-social/indigo/lex/util" 13 12 14 13 cbg "github.com/whyrusleeping/cbor-gen" ··· 41 40 LabelInfo *comatproto.LabelSubscribeLabels_Info 42 41 43 42 // some private fields for internal routing perf 44 - PrivUid models.Uid `json:"-" cborgen:"-"` 45 - PrivPdsId uint `json:"-" cborgen:"-"` 46 - PrivRelevantPds []uint `json:"-" cborgen:"-"` 47 - Preserialized []byte `json:"-" cborgen:"-"` 43 + PrivUid uint64 `json:"-" cborgen:"-"` 44 + PrivPdsId uint `json:"-" cborgen:"-"` 45 + PrivRelevantPds []uint `json:"-" cborgen:"-"` 46 + Preserialized []byte `json:"-" cborgen:"-"` 48 47 } 49 48 50 49 func (evt *XRPCStreamEvent) Serialize(wc io.Writer) error {
+23 -24
cmd/relayered/stream/persist/diskpersist/diskpersist.go
··· 15 15 "time" 16 16 17 17 "github.com/bluesky-social/indigo/api/atproto" 18 - "github.com/bluesky-social/indigo/cmd/relayered/models" 19 18 "github.com/bluesky-social/indigo/cmd/relayered/stream" 20 19 "github.com/bluesky-social/indigo/cmd/relayered/stream/persist" 21 20 arc "github.com/hashicorp/golang-lru/arc/v2" ··· 42 41 curSeq int64 43 42 44 43 uids UidSource 45 - uidCache *arc.ARCCache[models.Uid, string] // TODO: unused 46 - didCache *arc.ARCCache[string, models.Uid] 44 + uidCache *arc.ARCCache[uint64, string] 45 + didCache *arc.ARCCache[string, uint64] 47 46 48 47 writers *sync.Pool 49 48 buffers *sync.Pool ··· 98 97 } 99 98 100 99 type UidSource interface { 101 - DidToUid(ctx context.Context, did string) (models.Uid, error) 100 + DidToUid(ctx context.Context, did string) (uint64, error) 102 101 } 103 102 104 103 func NewDiskPersistence(primaryDir, archiveDir string, db *gorm.DB, opts *DiskPersistOptions) (*DiskPersistence, error) { ··· 106 105 opts = DefaultDiskPersistOptions() 107 106 } 108 107 109 - uidCache, err := arc.NewARC[models.Uid, string](opts.UIDCacheSize) 108 + uidCache, err := arc.NewARC[uint64, string](opts.UIDCacheSize) 110 109 if err != nil { 111 110 return nil, fmt.Errorf("failed to create uid cache: %w", err) 112 111 } 113 112 114 - didCache, err := arc.NewARC[string, models.Uid](opts.DIDCacheSize) 113 + didCache, err := arc.NewARC[string, uint64](opts.DIDCacheSize) 115 114 if err != nil { 116 115 return nil, fmt.Errorf("failed to create did cache: %w", err) 117 116 } ··· 570 569 // only those two get peristed right now 571 570 } 572 571 573 - usr, err := dp.uidForDid(ctx, did) 572 + uid, err := dp.uidForDid(ctx, did) 574 573 if err != nil { 575 574 return err 576 575 } ··· 584 583 // Set event length in header 585 584 binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize)) 586 585 // Set user UID in header 587 - binary.LittleEndian.PutUint64(b[12:], uint64(usr)) 586 + binary.LittleEndian.PutUint64(b[12:], uint64(uid)) 588 587 // set seq at [20:] inside mutex section inside doPersist 589 588 590 589 return dp.addJobToQueue(ctx, persistJob{ ··· 598 597 Flags uint32 599 598 Kind uint32 600 599 Seq int64 601 - Usr models.Uid 600 + Usr uint64 602 601 Len uint32 603 602 } 604 603 ··· 622 621 flags := binary.LittleEndian.Uint32(scratch[:4]) 623 622 kind := binary.LittleEndian.Uint32(scratch[4:8]) 624 623 l := binary.LittleEndian.Uint32(scratch[8:12]) 625 - usr := binary.LittleEndian.Uint64(scratch[12:20]) 624 + uid := binary.LittleEndian.Uint64(scratch[12:20]) 626 625 seq := binary.LittleEndian.Uint64(scratch[20:28]) 627 626 628 627 return &evtHeader{ 629 628 Flags: flags, 630 629 Kind: kind, 631 630 Len: l, 632 - Usr: models.Uid(usr), 631 + Usr: uid, 633 632 Seq: int64(seq), 634 633 }, nil 635 634 } ··· 653 652 return nil 654 653 } 655 654 656 - func (dp *DiskPersistence) uidForDid(ctx context.Context, did string) (models.Uid, error) { 655 + func (dp *DiskPersistence) uidForDid(ctx context.Context, did string) (uint64, error) { 657 656 if uid, ok := dp.didCache.Get(did); ok { 658 657 return uid, nil 659 658 } ··· 850 849 type UserAction struct { 851 850 gorm.Model 852 851 853 - Usr models.Uid 852 + Usr int64 854 853 RebaseAt int64 855 854 Takedown bool 856 855 } 857 856 858 - func (dp *DiskPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error { 857 + func (dp *DiskPersistence) TakeDownRepo(ctx context.Context, uid uint64) error { 859 858 /* 860 859 if err := p.meta.Create(&UserAction{ 861 - Usr: usr, 860 + Usr: uid, 862 861 Takedown: true, 863 862 }).Error; err != nil { 864 863 return err 865 864 } 866 865 */ 867 866 868 - return dp.forEachShardWithUserEvents(ctx, usr, func(ctx context.Context, fn string) error { 869 - if err := dp.deleteEventsForUser(ctx, usr, fn); err != nil { 867 + return dp.forEachShardWithUserEvents(ctx, uid, func(ctx context.Context, fn string) error { 868 + if err := dp.deleteEventsForUser(ctx, uid, fn); err != nil { 870 869 return err 871 870 } 872 871 ··· 874 873 }) 875 874 } 876 875 877 - func (dp *DiskPersistence) forEachShardWithUserEvents(ctx context.Context, usr models.Uid, cb func(context.Context, string) error) error { 876 + func (dp *DiskPersistence) forEachShardWithUserEvents(ctx context.Context, uid uint64, cb func(context.Context, string) error) error { 878 877 var refs []LogFileRef 879 878 if err := dp.meta.Order("created_at desc").Find(&refs).Error; err != nil { 880 879 return err 881 880 } 882 881 883 882 for _, r := range refs { 884 - mhas, err := dp.refMaybeHasUserEvents(ctx, usr, r) 883 + mhas, err := dp.refMaybeHasUserEvents(ctx, uid, r) 885 884 if err != nil { 886 885 return err 887 886 } ··· 903 902 return nil 904 903 } 905 904 906 - func (dp *DiskPersistence) refMaybeHasUserEvents(ctx context.Context, usr models.Uid, ref LogFileRef) (bool, error) { 905 + func (dp *DiskPersistence) refMaybeHasUserEvents(ctx context.Context, uid uint64, ref LogFileRef) (bool, error) { 907 906 // TODO: lazily computed bloom filters for users in each logfile 908 907 return true, nil 909 908 } ··· 917 916 return len(p), nil 918 917 } 919 918 920 - func (dp *DiskPersistence) deleteEventsForUser(ctx context.Context, usr models.Uid, fn string) error { 921 - return dp.mutateUserEventsInLog(ctx, usr, fn, EvtFlagTakedown, true) 919 + func (dp *DiskPersistence) deleteEventsForUser(ctx context.Context, uid uint64, fn string) error { 920 + return dp.mutateUserEventsInLog(ctx, uid, fn, EvtFlagTakedown, true) 922 921 } 923 922 924 - func (dp *DiskPersistence) mutateUserEventsInLog(ctx context.Context, usr models.Uid, fn string, flag uint32, zeroEvts bool) error { 923 + func (dp *DiskPersistence) mutateUserEventsInLog(ctx context.Context, uid uint64, fn string, flag uint32, zeroEvts bool) error { 925 924 fi, err := os.OpenFile(fn, os.O_RDWR, 0) 926 925 if err != nil { 927 926 return fmt.Errorf("failed to open log file: %w", err) ··· 941 940 return err 942 941 } 943 942 944 - if h.Usr == usr && h.Flags&flag == 0 { 943 + if h.Usr == uid && h.Flags&flag == 0 { 945 944 nflag := h.Flags | flag 946 945 947 946 binary.LittleEndian.PutUint32(scratch, nflag)
+1 -2
cmd/relayered/stream/persist/persist.go
··· 3 3 import ( 4 4 "context" 5 5 6 - "github.com/bluesky-social/indigo/cmd/relayered/models" 7 6 "github.com/bluesky-social/indigo/cmd/relayered/stream" 8 7 ) 9 8 ··· 11 10 type EventPersistence interface { 12 11 Persist(ctx context.Context, e *stream.XRPCStreamEvent) error 13 12 Playback(ctx context.Context, since int64, cb func(*stream.XRPCStreamEvent) error) error 14 - TakeDownRepo(ctx context.Context, usr models.Uid) error 13 + TakeDownRepo(ctx context.Context, uid uint64) error 15 14 Flush(context.Context) error 16 15 Shutdown(context.Context) error 17 16