this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

move db models to separate relay package

+105 -118
+3 -4
cmd/relayered/handlers.go
··· 12 12 13 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 14 "github.com/bluesky-social/indigo/cmd/relayered/relay" 15 - "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 16 15 "github.com/bluesky-social/indigo/xrpc" 17 16 18 17 "github.com/labstack/echo/v4" ··· 166 165 } 167 166 168 167 ustatus := u.GetUpstreamStatus() 169 - if ustatus == slurper.AccountStatusTakendown { 168 + if ustatus == relay.AccountStatusTakendown { 170 169 return nil, fmt.Errorf("account was taken down by its PDS") 171 170 } 172 171 173 - if ustatus == slurper.AccountStatusDeactivated { 172 + if ustatus == relay.AccountStatusDeactivated { 174 173 return nil, fmt.Errorf("account is temporarily deactivated") 175 174 } 176 175 177 - if ustatus == slurper.AccountStatusSuspended { 176 + if ustatus == relay.AccountStatusSuspended { 178 177 return nil, fmt.Errorf("account is suspended by its PDS") 179 178 } 180 179
+22 -22
cmd/relayered/relay/account.go
··· 10 10 11 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 - "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 13 + "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 14 14 "github.com/bluesky-social/indigo/xrpc" 15 15 16 16 "github.com/ipfs/go-cid" ··· 34 34 return xu.ID, nil 35 35 } 36 36 37 - func (r *Relay) LookupUserByDid(ctx context.Context, did string) (*slurper.Account, error) { 37 + func (r *Relay) LookupUserByDid(ctx context.Context, did string) (*models.Account, error) { 38 38 ctx, span := tracer.Start(ctx, "lookupUserByDid") 39 39 defer span.End() 40 40 ··· 43 43 return cu, nil 44 44 } 45 45 46 - var u slurper.Account 46 + var u models.Account 47 47 if err := r.db.Find(&u, "did = ?", did).Error; err != nil { 48 48 return nil, err 49 49 } ··· 57 57 return &u, nil 58 58 } 59 59 60 - func (r *Relay) LookupUserByUID(ctx context.Context, uid uint64) (*slurper.Account, error) { 60 + func (r *Relay) LookupUserByUID(ctx context.Context, uid uint64) (*models.Account, error) { 61 61 ctx, span := tracer.Start(ctx, "lookupUserByUID") 62 62 defer span.End() 63 63 64 - var u slurper.Account 64 + var u models.Account 65 65 if err := r.db.Find(&u, "id = ?", uid).Error; err != nil { 66 66 return nil, err 67 67 } ··· 73 73 return &u, nil 74 74 } 75 75 76 - func (r *Relay) newUser(ctx context.Context, host *slurper.PDS, did string) (*slurper.Account, error) { 76 + func (r *Relay) newUser(ctx context.Context, host *models.PDS, did string) (*models.Account, error) { 77 77 newUsersDiscovered.Inc() 78 78 start := time.Now() 79 79 account, err := r.syncPDSAccount(ctx, did, host, nil) ··· 90 90 // did is the user 91 91 // host is the PDS we received this from, not necessarily the canonical PDS in the DID document 92 92 // cachedAccount is (optionally) the account that we have already looked up from cache or database 93 - func (r *Relay) syncPDSAccount(ctx context.Context, did string, host *slurper.PDS, cachedAccount *slurper.Account) (*slurper.Account, error) { 93 + func (r *Relay) syncPDSAccount(ctx context.Context, did string, host *models.PDS, cachedAccount *models.Account) (*models.Account, error) { 94 94 ctx, span := tracer.Start(ctx, "syncPDSAccount") 95 95 defer span.End() 96 96 ··· 132 132 durl.Scheme = "http" 133 133 } 134 134 135 - var canonicalHost *slurper.PDS 135 + var canonicalHost *models.PDS 136 136 if host.Host == durl.Host { 137 137 // we got the message from the canonical PDS, convenient! 138 138 canonicalHost = host 139 139 } else { 140 140 // we got the message from an intermediate relay 141 141 // check our db for info on canonical PDS 142 - var peering slurper.PDS 142 + var peering models.PDS 143 143 if err := r.db.Find(&peering, "host = ?", durl.Host).Error; err != nil { 144 144 r.Logger.Error("failed to find pds", "host", durl.Host) 145 145 return nil, err ··· 217 217 err = r.db.Transaction(func(tx *gorm.DB) error { 218 218 if caPDS != 0 { 219 219 // decrement prior PDS's account count 220 - tx.Model(&slurper.PDS{}).Where("id = ?", caPDS).Update("repo_count", gorm.Expr("repo_count - 1")) 220 + tx.Model(&models.PDS{}).Where("id = ?", caPDS).Update("repo_count", gorm.Expr("repo_count - 1")) 221 221 } 222 222 // update user's PDS ID 223 - res := tx.Model(slurper.Account{}).Where("id = ?", cachedAccount.ID).Update("pds", canonicalHost.ID) 223 + res := tx.Model(models.Account{}).Where("id = ?", cachedAccount.ID).Update("pds", canonicalHost.ID) 224 224 if res.Error != nil { 225 225 return fmt.Errorf("failed to update users pds: %w", res.Error) 226 226 } 227 227 // increment new PDS's account count 228 - res = tx.Model(&slurper.PDS{}).Where("id = ? AND repo_count < repo_limit", canonicalHost.ID).Update("repo_count", gorm.Expr("repo_count + 1")) 228 + res = tx.Model(&models.PDS{}).Where("id = ? AND repo_count < repo_limit", canonicalHost.ID).Update("repo_count", gorm.Expr("repo_count + 1")) 229 229 return nil 230 230 }) 231 231 ··· 234 234 return cachedAccount, nil 235 235 } 236 236 237 - newAccount := slurper.Account{ 237 + newAccount := models.Account{ 238 238 Did: did, 239 239 PDS: canonicalHost.ID, 240 240 } 241 241 242 242 err = r.db.Transaction(func(tx *gorm.DB) error { 243 - res := tx.Model(&slurper.PDS{}).Where("id = ? AND repo_count < repo_limit", canonicalHost.ID).Update("repo_count", gorm.Expr("repo_count + 1")) 243 + res := tx.Model(&models.PDS{}).Where("id = ? AND repo_count < repo_limit", canonicalHost.ID).Update("repo_count", gorm.Expr("repo_count + 1")) 244 244 if res.Error != nil { 245 245 return fmt.Errorf("failed to increment repo count for pds %q: %w", canonicalHost.Host, res.Error) 246 246 } ··· 266 266 return err 267 267 } 268 268 269 - if err := r.db.Model(slurper.Account{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil { 269 + if err := r.db.Model(models.Account{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil { 270 270 return err 271 271 } 272 272 u.SetTakenDown(true) ··· 282 282 return err 283 283 } 284 284 285 - if err := r.db.Model(slurper.Account{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil { 285 + if err := r.db.Model(models.Account{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil { 286 286 return err 287 287 } 288 288 u.SetTakenDown(false) ··· 290 290 return nil 291 291 } 292 292 293 - func (r *Relay) GetAccountPreviousState(ctx context.Context, uid uint64) (*slurper.AccountPreviousState, error) { 294 - var prevState slurper.AccountPreviousState 293 + func (r *Relay) GetAccountPreviousState(ctx context.Context, uid uint64) (*models.AccountPreviousState, error) { 294 + var prevState models.AccountPreviousState 295 295 if err := r.db.First(&prevState, uid).Error; err != nil { 296 296 if errors.Is(err, gorm.ErrRecordNotFound) { 297 297 return nil, ErrAccountLastUnavailable ··· 303 303 } 304 304 305 305 func (r *Relay) GetRepoRoot(ctx context.Context, uid uint64) (cid.Cid, error) { 306 - var prevState slurper.AccountPreviousState 306 + var prevState models.AccountPreviousState 307 307 err := r.db.First(&prevState, uid).Error 308 308 if err == nil { 309 309 return prevState.Cid.CID, nil ··· 325 325 return pdsHostname, nil 326 326 } 327 327 328 - func (r *Relay) ListAccounts(ctx context.Context, cursor int64, limit int) ([]*slurper.Account, error) { 328 + func (r *Relay) ListAccounts(ctx context.Context, cursor int64, limit int) ([]*models.Account, error) { 329 329 330 - accounts := []*slurper.Account{} 331 - if err := r.db.Model(&slurper.Account{}).Where("id > ? AND NOT taken_down AND (upstream_status IS NULL OR upstream_status = 'active')", cursor).Order("id").Limit(limit).Find(&accounts).Error; err != nil { 330 + accounts := []*models.Account{} 331 + if err := r.db.Model(&models.Account{}).Where("id > ? AND NOT taken_down AND (upstream_status IS NULL OR upstream_status = 'active')", cursor).Order("id").Limit(limit).Find(&accounts).Error; err != nil { 332 332 return nil, err 333 333 } 334 334 return accounts, nil
+13
cmd/relayered/relay/account_status.go
··· 1 + package relay 2 + 3 + var ( 4 + // AccountStatusActive is not in the spec but used internally 5 + AccountStatusActive = "active" 6 + 7 + AccountStatusDeactivated = "deactivated" 8 + AccountStatusDeleted = "deleted" 9 + AccountStatusDesynchronized = "desynchronized" 10 + AccountStatusSuspended = "suspended" 11 + AccountStatusTakendown = "takendown" 12 + AccountStatusThrottled = "throttled" 13 + )
+4 -4
cmd/relayered/relay/domain_ban.go
··· 4 4 "context" 5 5 "strings" 6 6 7 - "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 7 + "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 8 8 ) 9 9 10 10 // DomainIsBanned checks if the given host is banned, starting with the host ··· 42 42 } 43 43 44 44 func (r *Relay) findDomainBan(ctx context.Context, host string) (bool, error) { 45 - var db slurper.DomainBan 46 - if err := r.db.Find(&db, "domain = ?", host).Error; err != nil { 45 + var ban models.DomainBan 46 + if err := r.db.Find(&ban, "domain = ?", host).Error; err != nil { 47 47 return false, err 48 48 } 49 49 50 - if db.ID == 0 { 50 + if ban.ID == 0 { 51 51 return false, nil 52 52 } 53 53
+17 -11
cmd/relayered/relay/firehose.go
··· 9 9 10 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 - "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 12 + "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 13 13 "github.com/bluesky-social/indigo/cmd/relayered/stream" 14 14 15 15 "github.com/ipfs/go-cid" ··· 18 18 ) 19 19 20 20 // handleFedEvent() is the callback passed to Slurper called from Slurper.handleConnection() 21 - func (r *Relay) handleFedEvent(ctx context.Context, host *slurper.PDS, env *stream.XRPCStreamEvent) error { 21 + func (r *Relay) handleFedEvent(ctx context.Context, host *models.PDS, env *stream.XRPCStreamEvent) error { 22 22 ctx, span := tracer.Start(ctx, "handleFedEvent") 23 23 defer span.End() 24 24 ··· 110 110 } 111 111 112 112 // Process the account status change 113 - repoStatus := slurper.AccountStatusActive 113 + repoStatus := AccountStatusActive 114 114 if !env.RepoAccount.Active && env.RepoAccount.Status != nil { 115 115 repoStatus = *env.RepoAccount.Status 116 116 } ··· 128 128 // override with local status 129 129 if account.GetTakenDown() { 130 130 shouldBeActive = false 131 - status = &slurper.AccountStatusTakendown 131 + status = &AccountStatusTakendown 132 132 } 133 133 134 134 // Broadcast the account event to all consumers ··· 161 161 } 162 162 } 163 163 164 - func (r *Relay) handleCommit(ctx context.Context, host *slurper.PDS, evt *comatproto.SyncSubscribeRepos_Commit) error { 164 + func (r *Relay) handleCommit(ctx context.Context, host *models.PDS, evt *comatproto.SyncSubscribeRepos_Commit) error { 165 165 r.Logger.Debug("relay got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo) 166 166 167 167 account, err := r.LookupUserByDid(ctx, evt.Repo) ··· 184 184 185 185 ustatus := account.GetUpstreamStatus() 186 186 187 - if account.GetTakenDown() || ustatus == slurper.AccountStatusTakendown { 187 + if account.GetTakenDown() || ustatus == AccountStatusTakendown { 188 188 r.Logger.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 189 189 repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc() 190 190 return nil 191 191 } 192 192 193 - if ustatus == slurper.AccountStatusSuspended { 193 + if ustatus == AccountStatusSuspended { 194 194 r.Logger.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 195 195 repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc() 196 196 return nil 197 197 } 198 198 199 - if ustatus == slurper.AccountStatusDeactivated { 199 + if ustatus == AccountStatusDeactivated { 200 200 r.Logger.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 201 201 repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc() 202 202 return nil ··· 225 225 } 226 226 } 227 227 228 - var prevState slurper.AccountPreviousState 228 + // TODO: very messy fetch code here 229 + var prevState models.AccountPreviousState 229 230 err = r.db.First(&prevState, account.ID).Error 230 231 prevP := &prevState 231 232 if errors.Is(err, gorm.ErrRecordNotFound) { ··· 236 237 } 237 238 dbPrevRootStr := "" 238 239 dbPrevSeqStr := "" 240 + var prevRev *syntax.TID 241 + var prevData *cid.Cid 239 242 if prevP != nil { 240 243 if prevState.Seq >= evt.Seq && ((prevState.Seq - evt.Seq) < 2000) { 241 244 // ignore catchup overlap of 200 on some subscribeRepos restarts 242 245 repoCommitsResultCounter.WithLabelValues(host.Host, "dup").Inc() 243 246 return nil 244 247 } 248 + prevData = &prevState.Cid.CID 249 + t := syntax.TID(prevState.Rev) 250 + prevRev = &t 245 251 dbPrevRootStr = prevState.Cid.CID.String() 246 252 dbPrevSeqStr = strconv.FormatInt(prevState.Seq, 10) 247 253 } ··· 249 255 if evt.PrevData != nil { 250 256 evtPrevDataStr = ((*cid.Cid)(evt.PrevData)).String() 251 257 } 252 - newRootCid, err := r.Validator.HandleCommit(ctx, host, account, evt, prevP) 258 + newRootCid, err := r.Validator.HandleCommit(ctx, host, account, evt, prevRev, prevData) 253 259 if err != nil { 254 260 // XXX: induction trace log 255 261 r.Logger.Error("commit bad", "seq", evt.Seq, "pseq", dbPrevSeqStr, "pdsHost", host.Host, "repo", evt.Repo, "prev", evtPrevDataStr, "dbprev", dbPrevRootStr, "err", err) ··· 281 287 } 282 288 283 289 // handleSync processes #sync messages 284 - func (r *Relay) handleSync(ctx context.Context, host *slurper.PDS, evt *comatproto.SyncSubscribeRepos_Sync) error { 290 + func (r *Relay) handleSync(ctx context.Context, host *models.PDS, evt *comatproto.SyncSubscribeRepos_Sync) error { 285 291 account, err := r.LookupUserByDid(ctx, evt.Did) 286 292 if err != nil { 287 293 if !errors.Is(err, gorm.ErrRecordNotFound) {
+7 -6
cmd/relayered/relay/relay.go
··· 6 6 7 7 "github.com/bluesky-social/indigo/atproto/identity" 8 8 "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 9 + "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 9 10 "github.com/bluesky-social/indigo/cmd/relayered/relay/validator" 10 11 "github.com/bluesky-social/indigo/cmd/relayered/stream/eventmgr" 11 12 "github.com/bluesky-social/indigo/xrpc" ··· 37 38 consumers map[uint64]*SocketConsumer 38 39 39 40 // Account cache 40 - userCache *lru.Cache[string, *slurper.Account] 41 + userCache *lru.Cache[string, *models.Account] 41 42 } 42 43 43 44 type RelayConfig struct { ··· 64 65 config = DefaultRelayConfig() 65 66 } 66 67 67 - uc, _ := lru.New[string, *slurper.Account](2_000_000) 68 + uc, _ := lru.New[string, *models.Account](2_000_000) 68 69 69 70 r := &Relay{ 70 71 db: db, ··· 102 103 } 103 104 104 105 func (r *Relay) MigrateDatabase() error { 105 - if err := r.db.AutoMigrate(slurper.DomainBan{}); err != nil { 106 + if err := r.db.AutoMigrate(models.DomainBan{}); err != nil { 106 107 return err 107 108 } 108 - if err := r.db.AutoMigrate(slurper.PDS{}); err != nil { 109 + if err := r.db.AutoMigrate(models.PDS{}); err != nil { 109 110 return err 110 111 } 111 - if err := r.db.AutoMigrate(slurper.Account{}); err != nil { 112 + if err := r.db.AutoMigrate(models.Account{}); err != nil { 112 113 return err 113 114 } 114 - if err := r.db.AutoMigrate(slurper.AccountPreviousState{}); err != nil { 115 + if err := r.db.AutoMigrate(models.AccountPreviousState{}); err != nil { 115 116 return err 116 117 } 117 118 return nil
-32
cmd/relayered/relay/slurper/account.go
··· 1 - package slurper 2 - 3 - var ( 4 - // AccountStatusActive is not in the spec but used internally 5 - // the alternative would be an additional SQL column for "active" or status="" to imply active 6 - AccountStatusActive = "active" 7 - 8 - AccountStatusDeactivated = "deactivated" 9 - AccountStatusDeleted = "deleted" 10 - AccountStatusDesynchronized = "desynchronized" 11 - AccountStatusSuspended = "suspended" 12 - AccountStatusTakendown = "takendown" 13 - AccountStatusThrottled = "throttled" 14 - ) 15 - 16 - var AccountStatusList = []string{ 17 - AccountStatusActive, 18 - AccountStatusDeactivated, 19 - AccountStatusDeleted, 20 - AccountStatusDesynchronized, 21 - AccountStatusSuspended, 22 - AccountStatusTakendown, 23 - AccountStatusThrottled, 24 - } 25 - var AccountStatuses map[string]bool 26 - 27 - func init() { 28 - AccountStatuses = make(map[string]bool, len(AccountStatusList)) 29 - for _, status := range AccountStatusList { 30 - AccountStatuses[status] = true 31 - } 32 - }
+16 -16
cmd/relayered/relay/slurper/models.go cmd/relayered/relay/models/models.go
··· 1 - package slurper 1 + package models 2 2 3 3 import ( 4 4 "sync" ··· 13 13 type DomainBan struct { 14 14 gorm.Model 15 15 Domain string `gorm:"unique"` 16 - } 17 - 18 - type AccountPreviousState struct { 19 - Uid uint64 `gorm:"column:uid;primaryKey"` 20 - Cid DbCID `gorm:"column:cid"` 21 - Rev string `gorm:"column:rev"` 22 - Seq int64 `gorm:"column:seq"` 23 - } 24 - 25 - func (ups *AccountPreviousState) GetCid() cid.Cid { 26 - return ups.Cid.CID 27 - } 28 - func (ups *AccountPreviousState) GetRev() syntax.TID { 29 - xt, _ := syntax.ParseTID(ups.Rev) 30 - return xt 31 16 } 32 17 33 18 type PDS struct { ··· 111 96 defer account.lk.Unlock() 112 97 return account.UpstreamStatus 113 98 } 99 + 100 + type AccountPreviousState struct { 101 + Uid uint64 `gorm:"column:uid;primaryKey"` 102 + Cid DbCID `gorm:"column:cid"` 103 + Rev string `gorm:"column:rev"` 104 + Seq int64 `gorm:"column:seq"` 105 + } 106 + 107 + func (ups *AccountPreviousState) GetCid() cid.Cid { 108 + return ups.Cid.CID 109 + } 110 + func (ups *AccountPreviousState) GetRev() syntax.TID { 111 + xt, _ := syntax.ParseTID(ups.Rev) 112 + return xt 113 + }
+1 -1
cmd/relayered/relay/slurper/models_dbcid.go cmd/relayered/relay/models/dbcid.go
··· 1 - package slurper 1 + package models 2 2 3 3 import ( 4 4 "database/sql/driver"
+12 -11
cmd/relayered/relay/slurper/slurper.go
··· 13 13 "github.com/RussellLuo/slidingwindow" 14 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 15 "github.com/bluesky-social/indigo/cmd/relayered/stream" 16 + "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 16 17 "github.com/bluesky-social/indigo/cmd/relayered/stream/schedulers/parallel" 17 18 18 19 "github.com/gorilla/websocket" 19 20 "gorm.io/gorm" 20 21 ) 21 22 22 - type IndexCallback func(context.Context, *PDS, *stream.XRPCStreamEvent) error 23 + type IndexCallback func(context.Context, *models.PDS, *stream.XRPCStreamEvent) error 23 24 24 25 type Slurper struct { 25 26 cb IndexCallback ··· 76 77 } 77 78 78 79 type activeSub struct { 79 - pds *PDS 80 + pds *models.PDS 80 81 lk sync.RWMutex 81 82 ctx context.Context 82 83 cancel func() ··· 245 246 return nil 246 247 } 247 248 248 - var peering PDS 249 + var peering models.PDS 249 250 if err := s.db.Find(&peering, "host = ?", host).Error; err != nil { 250 251 return err 251 252 } ··· 261 262 return ErrNewSubsDisabled 262 263 } 263 264 // New PDS! 264 - npds := PDS{ 265 + npds := models.PDS{ 265 266 Host: host, 266 267 SSL: s.Config.SSL, 267 268 Registered: reg, ··· 286 287 287 288 if !peering.Registered && reg { 288 289 peering.Registered = true 289 - if err := s.db.Model(PDS{}).Where("id = ?", peering.ID).Update("registered", true).Error; err != nil { 290 + if err := s.db.Model(models.PDS{}).Where("id = ?", peering.ID).Update("registered", true).Error; err != nil { 290 291 return err 291 292 } 292 293 } ··· 310 311 s.lk.Lock() 311 312 defer s.lk.Unlock() 312 313 313 - var all []PDS 314 + var all []models.PDS 314 315 if err := s.db.Find(&all, "registered = true AND blocked = false").Error; err != nil { 315 316 return err 316 317 } ··· 334 335 return nil 335 336 } 336 337 337 - func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *PDS, sub *activeSub, newHost bool) { 338 + func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, sub *activeSub, newHost bool) { 338 339 defer func() { 339 340 s.lk.Lock() 340 341 defer s.lk.Unlock() ··· 384 385 385 386 if backoff > 15 { 386 387 s.log.Warn("pds does not appear to be online, disabling for now", "pdsHost", host.Host) 387 - if err := s.db.Model(&PDS{}).Where("id = ?", host.ID).Update("registered", false).Error; err != nil { 388 + if err := s.db.Model(&models.PDS{}).Where("id = ?", host.ID).Update("registered", false).Error; err != nil { 388 389 s.log.Error("failed to unregister failing pds", "err", err) 389 390 } 390 391 ··· 428 429 429 430 var EventsTimeout = time.Minute 430 431 431 - func (s *Slurper) handleConnection(ctx context.Context, host *PDS, con *websocket.Conn, lastCursor *int64, sub *activeSub) error { 432 + func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *websocket.Conn, lastCursor *int64, sub *activeSub) error { 432 433 ctx, cancel := context.WithCancel(ctx) 433 434 defer cancel() 434 435 ··· 594 595 595 596 tx := s.db.WithContext(ctx).Begin() 596 597 for _, cursor := range cursors { 597 - if err := tx.WithContext(ctx).Model(PDS{}).Where("id = ?", cursor.id).UpdateColumn("cursor", cursor.cursor).Error; err != nil { 598 + if err := tx.WithContext(ctx).Model(models.PDS{}).Where("id = ?", cursor.id).UpdateColumn("cursor", cursor.cursor).Error; err != nil { 598 599 errs = append(errs, err) 599 600 } else { 600 601 okcount++ ··· 634 635 // cleanup in the run thread subscribeWithRedialer() will delete(s.active, host) 635 636 636 637 if block { 637 - if err := s.db.Model(PDS{}).Where("id = ?", ac.pds.ID).UpdateColumn("blocked", true).Error; err != nil { 638 + if err := s.db.Model(models.PDS{}).Where("id = ?", ac.pds.ID).UpdateColumn("blocked", true).Error; err != nil { 638 639 return fmt.Errorf("failed to set host as blocked: %w", err) 639 640 } 640 641 }
+10 -11
cmd/relayered/relay/validator/validator.go
··· 13 13 "github.com/bluesky-social/indigo/atproto/identity" 14 14 "github.com/bluesky-social/indigo/atproto/repo" 15 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 16 + "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 17 17 18 18 "github.com/ipfs/go-cid" 19 19 "go.opentelemetry.io/otel" ··· 58 58 } 59 59 60 60 type NextCommitHandler interface { 61 - HandleCommit(ctx context.Context, host *slurper.PDS, uid uint64, did string, commit *comatproto.SyncSubscribeRepos_Commit) error 61 + HandleCommit(ctx context.Context, host *models.PDS, uid uint64, did string, commit *comatproto.SyncSubscribeRepos_Commit) error 62 62 } 63 63 64 64 type userLock struct { ··· 99 99 } 100 100 } 101 101 102 - func (val *Validator) HandleCommit(ctx context.Context, host *slurper.PDS, account *slurper.Account, commit *comatproto.SyncSubscribeRepos_Commit, prevRoot *slurper.AccountPreviousState) (newRoot *cid.Cid, err error) { 102 + func (val *Validator) HandleCommit(ctx context.Context, host *models.PDS, account *models.Account, commit *comatproto.SyncSubscribeRepos_Commit, prevRev *syntax.TID, prevData *cid.Cid) (newRoot *cid.Cid, err error) { 103 103 uid := account.GetUid() 104 104 unlock := val.lockUser(ctx, uid) 105 105 defer unlock() 106 - repoFragment, err := val.VerifyCommitMessage(ctx, host, commit, prevRoot) 106 + repoFragment, err := val.VerifyCommitMessage(ctx, host, commit, prevRev, prevData) 107 107 if err != nil { 108 108 return nil, err 109 109 } ··· 124 124 125 125 var ErrNewRevBeforePrevRev = &revOutOfOrderError{} 126 126 127 - func (val *Validator) VerifyCommitMessage(ctx context.Context, host *slurper.PDS, msg *comatproto.SyncSubscribeRepos_Commit, prevRoot *slurper.AccountPreviousState) (*repo.Repo, error) { 127 + func (val *Validator) VerifyCommitMessage(ctx context.Context, host *models.PDS, msg *comatproto.SyncSubscribeRepos_Commit, prevRev *syntax.TID, prevData *cid.Cid) (*repo.Repo, error) { 128 128 hostname := host.Host 129 129 hasWarning := false 130 130 commitVerifyStarts.Inc() ··· 140 140 commitVerifyErrors.WithLabelValues(hostname, "tid").Inc() 141 141 return nil, err 142 142 } 143 - if prevRoot != nil { 144 - prevRev := prevRoot.GetRev() 143 + if prevRev != nil { 145 144 curTime := rev.Time() 146 145 prevTime := prevRev.Time() 147 146 if curTime.Before(prevTime) { ··· 246 245 247 246 if msg.PrevData != nil { 248 247 c := (*cid.Cid)(msg.PrevData) 249 - if prevRoot != nil { 250 - if *c != prevRoot.GetCid() { 248 + if prevData != nil { 249 + if *c != *prevData { 251 250 commitVerifyWarnings.WithLabelValues(hostname, "pr").Inc() 252 251 // XXX: induction trace log 253 252 val.log.Warn("commit prevData mismatch", "seq", msg.Seq, "pdsHost", host.Host, "repo", msg.Repo) ··· 288 287 } 289 288 //logger.Debug("prevData matched", "prevData", c.String(), "computed", computed.String()) 290 289 291 - if prevRoot == nil { 290 + if prevData == nil { 292 291 commitVerifyOkish.WithLabelValues(hostname, "new").Inc() 293 292 } else if hasWarning { 294 293 commitVerifyOkish.WithLabelValues(hostname, "warn").Inc() ··· 306 305 } 307 306 308 307 // HandleSync checks signed commit from a #sync message 309 - func (val *Validator) HandleSync(ctx context.Context, host *slurper.PDS, msg *comatproto.SyncSubscribeRepos_Sync) (newRoot *cid.Cid, err error) { 308 + func (val *Validator) HandleSync(ctx context.Context, host *models.PDS, msg *comatproto.SyncSubscribeRepos_Sync) (newRoot *cid.Cid, err error) { 310 309 hostname := host.Host 311 310 hasWarning := false 312 311