this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor away syncHostAccount

+188 -366
+4 -4
cmd/rerelay/main.go
··· 210 210 211 211 persistDir := cctx.String("persist-dir") 212 212 os.MkdirAll(persistDir, os.ModePerm) 213 - pOpts := diskpersist.DefaultDiskPersistOptions() 214 - pOpts.Retention = cctx.Duration("replay-window") 215 - logger.Info("setting up disk persister", "dir", persistDir, "replayWindow", pOpts.Retention) 216 - persister, err := diskpersist.NewDiskPersistence(persistDir, "", db, pOpts) 213 + persitConfig := diskpersist.DefaultDiskPersistOptions() 214 + persitConfig.Retention = cctx.Duration("replay-window") 215 + logger.Info("setting up disk persister", "dir", persistDir, "replayWindow", persitConfig.Retention) 216 + persister, err := diskpersist.NewDiskPersistence(persistDir, "", db, persitConfig) 217 217 if err != nil { 218 218 return fmt.Errorf("setting up disk persister: %w", err) 219 219 }
+82 -140
cmd/rerelay/relay/account.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 - "net/url" 8 - "strings" 9 - "time" 10 7 11 8 "github.com/bluesky-social/indigo/atproto/syntax" 12 9 "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" ··· 54 51 return &repo, nil 55 52 } 56 53 57 - /* XXX: refactor in to syncHostAccount? */ 58 - func (r *Relay) CreateAccount(ctx context.Context, host *models.Host, did syntax.DID) (*models.Account, error) { 59 - newUsersDiscovered.Inc() 60 - start := time.Now() 61 - account, err := r.syncHostAccount(ctx, did, host, nil) 62 - newUserDiscoveryDuration.Observe(time.Since(start).Seconds()) 63 - if err != nil { 64 - return nil, fmt.Errorf("fed event create external user: %w", err) 65 - } 66 - return account, nil 67 - } 68 - 69 - // syncHostAccount ensures that a DID has an account record in the database attached to a Host record in the database 70 - // Some fields may be updated if needed. 71 - // did is the user 72 - // host is the Host we received this from, not necessarily the canonical Host in the DID document 73 - // cachedAccount is (optionally) the account that we have already looked up from cache or database 74 - func (r *Relay) syncHostAccount(ctx context.Context, did syntax.DID, host *models.Host, cachedAccount *models.Account) (*models.Account, error) { 75 - ctx, span := tracer.Start(ctx, "syncHostAccount") 76 - defer span.End() 77 - 78 - externalUserCreationAttempts.Inc() 54 + // Attempts creation of a new account associated with the given host, presumably because the account was discovered on that host's stream. 55 + // 56 + // If the account's identity doesn't match the host, this will fail. We only create accounts associated with hosts we already know of, not remote hosts (aka, no spidering). 57 + func (r *Relay) CreateHostAccount(ctx context.Context, did syntax.DID, hostID uint64, hostname string) (*models.Account, error) { 58 + // NOTE: this method doesn't use locking. the database UNIQUE constraint should prevent duplicate account creation. 59 + logger := r.Logger.With("did", did, "hostname", hostname) 79 60 80 - r.Logger.Debug("create external user", "did", did) 61 + //newUsersDiscovered.Inc() 62 + //start := time.Now() 81 63 82 - // lookup identity so that we know a DID's canonical source Host 83 64 ident, err := r.dir.LookupDID(ctx, did) 84 65 if err != nil { 85 - return nil, fmt.Errorf("no ident for did %s, %w", did, err) 66 + return nil, fmt.Errorf("new account identity resolution: %w", err) 86 67 } 87 68 pdsEndpoint := ident.PDSEndpoint() 88 69 if pdsEndpoint == "" { 89 - return nil, fmt.Errorf("account has no PDS endpoint registered: %s", did) 70 + return nil, fmt.Errorf("new account has no declared PDS: %s", did) 90 71 } 91 - durl, err := url.Parse(pdsEndpoint) 72 + pdsHostname, _, err := ParseHostname(pdsEndpoint) 92 73 if err != nil { 93 - return nil, fmt.Errorf("account has bad url (%#v): %w", pdsEndpoint, err) 74 + return nil, fmt.Errorf("new account PDS endpoint invalid: %s", pdsEndpoint) 94 75 } 95 76 96 - // is the canonical Host banned? 97 - ban, err := r.DomainIsBanned(ctx, durl.Host) 98 - if err != nil { 99 - return nil, fmt.Errorf("failed to check pds ban status: %w", err) 100 - } 101 - if ban { 102 - return nil, fmt.Errorf("cannot create user on pds with banned domain") 77 + if pdsHostname != hostname { 78 + if r.Config.SkipAccountHostCheck { 79 + logger.Warn("ignoring account host mismatch", "pdsHostname", pdsHostname) 80 + } else { 81 + return nil, fmt.Errorf("new account from a different host: %s", pdsHostname) 82 + } 103 83 } 104 84 105 - if strings.HasPrefix(durl.Host, "localhost:") { 106 - durl.Scheme = "http" 107 - } 85 + // TODO: fetch the full host, apply throttling or rate-limits? 108 86 109 - var canonicalHost *models.Host 110 - if host.Hostname == durl.Host { 111 - // we got the message from the canonical Host, convenient! 112 - canonicalHost = host 113 - } else { 114 - // we got the message from an intermediate relay 115 - // check our db for info on canonical Host 116 - // XXX: rename "peering" 117 - var peering models.Host 118 - if err := r.db.Find(&peering, "hostname = ?", durl.Host).Error; err != nil { 119 - r.Logger.Error("failed to find host", "host", durl.Host) 120 - return nil, err 121 - } 122 - canonicalHost = &peering 87 + // TODO: could be verifying upstream status here (using r.HostChecker) 88 + // XXX: limits/throttling; reach in to Slurper? 89 + acc := models.Account{ 90 + DID: did.String(), 91 + HostID: hostID, 92 + Status: models.AccountStatusActive, 93 + UpstreamStatus: models.AccountStatusActive, 123 94 } 124 95 125 - if canonicalHost.Status == models.HostStatusBanned { 126 - return nil, fmt.Errorf("refusing to create user with banned Host") 127 - } 128 - 129 - if canonicalHost.ID == 0 { 130 - // we got an event from a non-canonical Host (an intermediate relay) 131 - // a non-canonical Host we haven't seen before; ping it to make sure it's real 132 - // TODO: what do we actually want to track about the source we immediately got this message from vs the canonical Host? 133 - r.Logger.Warn("new host discovered in create user flow", "pds", durl.String(), "did", did) 134 - 135 - err = r.HostChecker.CheckHost(ctx, durl.String()) 136 - if err != nil { 137 - // TODO: failing this shouldn't halt our indexing 138 - return nil, fmt.Errorf("failed to check unrecognized pds: %w", err) 139 - } 140 - 141 - // could check other things, a valid response is good enough for now 142 - canonicalHost.Hostname = durl.Host 143 - canonicalHost.NoSSL = !(durl.Scheme == "https") 144 - // XXX canonicalHost.RateLimit = float64(r.Slurper.Config.DefaultPerSecondLimit) 145 - // XXX canonicalHost.HourlyEventLimit = r.Slurper.Config.DefaultPerHourLimit 146 - // XXX canonicalHost.DailyEventLimit = r.Slurper.Config.DefaultPerDayLimit 147 - canonicalHost.AccountLimit = r.Slurper.Config.DefaultRepoLimit 148 - 149 - if r.Config.SSL && canonicalHost.NoSSL { 150 - return nil, fmt.Errorf("did references non-ssl Host, this is disallowed in prod: %q %q", did, pdsEndpoint) 96 + // create Account row and increment host count in the same transaction 97 + err = r.db.Transaction(func(tx *gorm.DB) error { 98 + if err := tx.Model(&models.Host{}).Where("id = ?", hostID).Update("account_count", gorm.Expr("account_count + 1")).Error; err != nil { 99 + return fmt.Errorf("failed to increment account count for host (%s): %w", hostname, err) 151 100 } 152 - 153 - if err := r.db.Create(&canonicalHost).Error; err != nil { 154 - return nil, err 101 + if err := tx.Create(&acc).Error; err != nil { 102 + return fmt.Errorf("failed to create account: %w", err) 155 103 } 104 + return nil 105 + }) 106 + if err != nil { 107 + return nil, err 156 108 } 157 109 158 - if canonicalHost.ID == 0 { 159 - panic("somehow failed to create a pds entry?") 160 - } 110 + r.accountCache.Add(did.String(), &acc) 161 111 162 - if canonicalHost.AccountCount >= canonicalHost.AccountLimit { 163 - // TODO: soft-limit / hard-limit ? create account in 'throttled' state, unless there are _really_ too many accounts 164 - return nil, fmt.Errorf("refusing to create user on Host at max repo limit for pds %q", canonicalHost.Hostname) 165 - } 112 + //newUserDiscoveryDuration.Observe(time.Since(start).Seconds()) 113 + return &acc, nil 114 + } 166 115 167 - // this lock just governs the lower half of this function 168 - r.accountLk.Lock() 169 - defer r.accountLk.Unlock() 116 + // Checks if account matches provided hostID, and in the fast pass returns successfully. If not, checks if the account should be updated. If the account is now on the indicated host, it is updated, both in the database and struct via pointer. 117 + // 118 + // TODO: could also update to another known host, if doesn't match this hostID? 119 + func (r *Relay) EnsureAccountHost(ctx context.Context, acc *models.Account, hostID uint64, hostname string) error { 120 + did := syntax.DID(acc.DID) 121 + logger := r.Logger.With("did", did, "hostname", hostname) 170 122 171 - if cachedAccount == nil { 172 - cachedAccount, err = r.GetAccount(ctx, did) 123 + if acc.HostID == hostID { 124 + return nil 173 125 } 174 - if errors.Is(err, ErrAccountNotFound) || errors.Is(err, gorm.ErrRecordNotFound) { 175 - err = nil 126 + 127 + ident, err := r.dir.LookupDID(ctx, did) 128 + if err != nil { 129 + return fmt.Errorf("account identity resolution: %w", err) 176 130 } 131 + pdsEndpoint := ident.PDSEndpoint() 132 + if pdsEndpoint == "" { 133 + return fmt.Errorf("account has no declared PDS: %s", did) 134 + } 135 + pdsHostname, _, err := ParseHostname(pdsEndpoint) 177 136 if err != nil { 178 - return nil, err 137 + return fmt.Errorf("account PDS endpoint invalid: %s", pdsEndpoint) 179 138 } 180 - if cachedAccount != nil { 181 - // XXX: caHost := cachedAccount.GetHost() 182 - caHost := cachedAccount.HostID 183 - if caHost != canonicalHost.ID { 184 - // Account is now on a different Host, update 185 - err = r.db.Transaction(func(tx *gorm.DB) error { 186 - if caHost != 0 { 187 - // decrement prior Host's account count 188 - tx.Model(&models.Host{}).Where("id = ?", caHost).Update("account_count", gorm.Expr("account_count - 1")) 189 - } 190 - // update user's Host ID 191 - res := tx.Model(models.Account{}).Where("uid = ?", cachedAccount.UID).Update("host_id", canonicalHost.ID) 192 - if res.Error != nil { 193 - return fmt.Errorf("failed to update users pds: %w", res.Error) 194 - } 195 - // increment new Host's account count 196 - res = tx.Model(&models.Host{}).Where("id = ? AND account_count < account_limit", canonicalHost.ID).Update("account_count", gorm.Expr("account_count + 1")) 197 - return nil 198 - }) 199 139 200 - // XXX: cachedAccount.SetHost(canonicalHost.ID) 201 - cachedAccount.HostID = canonicalHost.ID 202 - 203 - // flush account cache 204 - r.accountCache.Remove(did.String()) 140 + if pdsHostname != hostname { 141 + if r.Config.SkipAccountHostCheck { 142 + logger.Warn("ignoring account host mismatch", "pdsHostname", pdsHostname) 143 + return nil 144 + } else { 145 + return fmt.Errorf("new account from a different host: %s", pdsHostname) 205 146 } 206 - return cachedAccount, nil 207 147 } 208 148 209 - newAccount := models.Account{ 210 - DID: did.String(), 211 - HostID: canonicalHost.ID, 212 - Status: models.AccountStatusActive, 213 - UpstreamStatus: models.AccountStatusActive, 214 - } 149 + // TODO: could check upstream status here (using r.HostChecker) 150 + // TODO: for example, a moved account might go from takendown to active 151 + // XXX: limits/throttling; read in to Slurper? 215 152 153 + // create Account row and increment host count in the same transaction 216 154 err = r.db.Transaction(func(tx *gorm.DB) error { 217 - res := tx.Model(&models.Host{}).Where("id = ? AND account_count < account_limit", canonicalHost.ID).Update("account_count", gorm.Expr("account_count + 1")) 218 - if res.Error != nil { 219 - return fmt.Errorf("failed to increment repo count for pds %q: %w", canonicalHost.Hostname, res.Error) 155 + // decrement old host count 156 + if err := tx.Model(&models.Host{}).Where("id = ?", acc.HostID).Update("account_count", gorm.Expr("account_count - 1")).Error; err != nil { 157 + return fmt.Errorf("failed to decrement account count for former host (%d): %w", acc.HostID, err) 220 158 } 221 - if terr := tx.Create(&newAccount).Error; terr != nil { 222 - r.Logger.Error("failed to create user", "did", newAccount.DID, "err", terr) 223 - return fmt.Errorf("failed to create other pds user: %w", terr) 159 + // increment new host count 160 + if err := tx.Model(&models.Host{}).Where("id = ?", hostID).Update("account_count", gorm.Expr("account_count + 1")).Error; err != nil { 161 + return fmt.Errorf("failed to increment account count for host (%s): %w", hostname, err) 162 + } 163 + if err := tx.Model(models.Account{}).Where("uid = ?", acc.UID).Update("host_id", hostID).Error; err != nil { 164 + return fmt.Errorf("failed update account HostID: %w", err) 224 165 } 225 166 return nil 226 167 }) 227 168 if err != nil { 228 - r.Logger.Error("user create and pds inc err", "err", err) 229 - return nil, err 169 + return err 230 170 } 231 171 232 - r.accountCache.Add(did.String(), &newAccount) 172 + // evict stale record from account cache 173 + r.accountCache.Remove(did.String()) 233 174 234 - return &newAccount, nil 175 + acc.HostID = hostID 176 + return nil 235 177 } 236 178 237 179 func (r *Relay) UpdateAccountStatus(ctx context.Context, did syntax.DID, status models.AccountStatus) error {
+89 -189
cmd/rerelay/relay/ingest.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 + "log/slog" 7 8 "time" 8 9 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/atproto/identity" 10 12 "github.com/bluesky-social/indigo/atproto/syntax" 11 13 "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 12 14 "github.com/bluesky-social/indigo/cmd/rerelay/stream" 13 15 14 16 "go.opentelemetry.io/otel/attribute" 15 - "gorm.io/gorm" 16 17 ) 17 18 18 19 // This callback function gets called by Slurper on every upstream repo stream message from any host. ··· 56 57 } 57 58 } 58 59 59 - func (r *Relay) processCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, hostname string, hostID uint64) error { 60 - logger := r.Logger.With("did", evt.Repo, "seq", evt.Seq, "host", hostname, "eventType", "commit", "rev", evt.Rev) 61 - logger.Debug("relay got repo append event") 60 + // handles the shared part of event processing: that the account existing, is associated with this host, etc 61 + func (r *Relay) preProcessEvent(ctx context.Context, didStr string, hostname string, hostID uint64, logger *slog.Logger) (*models.Account, *identity.Identity, error) { 62 62 63 - did, err := syntax.ParseDID(evt.Repo) 63 + did, err := syntax.ParseDID(didStr) 64 64 if err != nil { 65 - return fmt.Errorf("invalid DID in message: %w", err) 65 + return nil, nil, fmt.Errorf("invalid DID in message: %w", err) 66 66 } 67 - 68 67 // XXX: did = did.Normalize() 69 - account, err := r.GetAccount(ctx, did) 68 + 69 + acc, err := r.GetAccount(ctx, did) 70 70 if err != nil { 71 - if !errors.Is(err, gorm.ErrRecordNotFound) { 72 - return fmt.Errorf("looking up event user: %w", err) 71 + if !errors.Is(err, ErrAccountNotFound) { 72 + return nil, nil, fmt.Errorf("fetching account: %w", err) 73 73 } 74 74 75 - host, err := r.GetHost(ctx, hostID) 76 - if err != nil { 77 - return err 78 - } 79 - account, err = r.CreateAccount(ctx, host, did) 75 + acc, err = r.CreateHostAccount(ctx, did, hostID, hostname) 80 76 if err != nil { 81 - return err 77 + return nil, nil, err 82 78 } 83 79 } 84 80 85 - if account == nil { 86 - return ErrAccountNotFound 81 + if acc == nil { 82 + // TODO: this is defensive and could be removed 83 + panic(ErrAccountNotFound) 87 84 } 88 85 89 - // XXX: lock on account 90 - ustatus := account.UpstreamStatus 91 - 92 - // XXX: lock on account 93 - if account.Status == models.AccountStatusTakendown || ustatus == models.AccountStatusTakendown { 94 - logger.Debug("dropping commit event from taken down user") 95 - return nil 86 + // verify that the account is on the subscribed host (or update if it should be) 87 + if err := r.EnsureAccountHost(ctx, acc, hostID, hostname); err != nil { 88 + return nil, nil, err 96 89 } 97 90 98 - if ustatus == models.AccountStatusSuspended { 99 - logger.Debug("dropping commit event from suspended user") 100 - return nil 91 + // skip identity lookup if account is not active 92 + if acc.Status != models.AccountStatusActive || acc.UpstreamStatus != models.AccountStatusActive { 93 + return acc, nil, nil 101 94 } 102 95 103 - if ustatus == models.AccountStatusDeactivated { 104 - logger.Debug("dropping commit event from deactivated user") 105 - return nil 96 + ident, err := r.dir.LookupDID(ctx, did) 97 + if err != nil { 98 + // XXX: handle more granularly (eg, true NotFound vs other errors); and add tests 99 + logger.Warn("failed to load identity") 106 100 } 101 + return acc, ident, nil 102 + } 107 103 108 - if evt.Rebase { 109 - return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, hostname) 110 - } 104 + func (r *Relay) processCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, hostname string, hostID uint64) error { 105 + logger := r.Logger.With("did", evt.Repo, "seq", evt.Seq, "host", hostname, "eventType", "commit", "rev", evt.Rev) 106 + logger.Debug("relay got repo append event") 111 107 112 - accountHostId := account.HostID 113 - if hostID != accountHostId && accountHostId != 0 { 114 - // XXX: metter logging 115 - logger.Warn("received event for repo from different pds than expected", "expectedHostID", accountHostId, "receivedHost", hostname) 116 - // Flush any cached DID documents for this user 117 - err = r.dir.Purge(ctx, did.AtIdentifier()) 118 - if err != nil { 119 - logger.Error("problem purging identity directory cache", "err", err) 120 - } 121 - 122 - // XXX: shouldn't need full Host? 123 - host, err := r.GetHost(ctx, hostID) 124 - if err != nil { 125 - return err 126 - } 127 - 128 - account, err = r.syncHostAccount(ctx, did, host, account) 129 - if err != nil { 130 - return err 131 - } 132 - 133 - if account.HostID != hostID && !r.Config.SkipAccountHostCheck { 134 - return fmt.Errorf("event from non-authoritative pds") 135 - } 108 + acc, ident, err := r.preProcessEvent(ctx, evt.Repo, hostname, hostID, logger) 109 + if err != nil { 110 + return err 136 111 } 137 112 138 - ident, err := r.dir.LookupDID(ctx, did) 139 - if err != nil { 140 - // XXX: handle more granularly (eg, true not-founds vs errors); and add tests 141 - logger.Warn("failed to load identity") 113 + if acc.Status != models.AccountStatusActive || acc.UpstreamStatus != models.AccountStatusActive { 114 + logger.Info("dropping commit message for non-active account", "status", acc.Status, "upstreamStatus", acc.UpstreamStatus) 115 + return nil 142 116 } 143 117 144 - var prevRepo *models.AccountRepo 145 - err = r.db.First(prevRepo, account.UID).Error 146 - if err != nil { 147 - if !errors.Is(err, gorm.ErrRecordNotFound) { 148 - // TODO: should this be a hard error? 149 - logger.Error("failed to read previous repo state", "err", err) 150 - } 151 - prevRepo = nil 118 + prevRepo, err := r.GetAccountRepo(ctx, acc.UID) 119 + if err != nil && !errors.Is(err, ErrAccountRepoNotFound) { 120 + // TODO: should this be a hard error? 121 + logger.Error("failed to read previous repo state", "err", err) 152 122 } 153 123 154 124 // most commit validation happens in this method. Note that is handles lenient/strict modes. ··· 158 128 return err 159 129 } 160 130 161 - err = r.UpsertAccountRepo(account.UID, syntax.TID(newRepo.Rev), newRepo.CommitCID, newRepo.CommitData) 131 + err = r.UpsertAccountRepo(acc.UID, syntax.TID(newRepo.Rev), newRepo.CommitCID, newRepo.CommitData) 162 132 if err != nil { 163 - return fmt.Errorf("failed to upsert account repo (%s): %w", account.DID, err) 133 + return fmt.Errorf("failed to upsert account repo (%s): %w", acc.DID, err) 164 134 } 165 135 166 - // Broadcast the identity event to all consumers 136 + // emit the event 167 137 // TODO: is this copy important? 168 138 commitCopy := *evt 169 139 err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{ 170 140 RepoCommit: &commitCopy, 171 - PrivUid: account.UID, 141 + PrivUid: acc.UID, 172 142 }) 173 143 if err != nil { 174 - logger.Error("failed to broadcast commit event", "error", err) 175 - return fmt.Errorf("failed to broadcast commit event: %w", err) 144 + logger.Error("failed to broadcast event", "error", err) 145 + return fmt.Errorf("failed to broadcast #commit event: %w", err) 176 146 } 177 147 178 148 return nil ··· 180 150 181 151 func (r *Relay) processSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync, hostname string, hostID uint64) error { 182 152 logger := r.Logger.With("did", evt.Did, "seq", evt.Seq, "host", hostname, "eventType", "sync") 183 - did, err := syntax.ParseDID(evt.Did) 184 - if err != nil { 185 - return fmt.Errorf("invalid DID in message: %s", did) 186 - } 187 - // XXX: did.Normalize() 188 - account, err := r.GetAccount(ctx, did) 189 - if err != nil { 190 - if !errors.Is(err, gorm.ErrRecordNotFound) { 191 - return fmt.Errorf("looking up event user: %w", err) 192 - } 193 153 194 - host, err := r.GetHost(ctx, hostID) 195 - if err != nil { 196 - return err 197 - } 198 - account, err = r.CreateAccount(ctx, host, did) 199 - } 154 + acc, ident, err := r.preProcessEvent(ctx, evt.Did, hostname, hostID, logger) 200 155 if err != nil { 201 - return fmt.Errorf("could not get user for did %#v: %w", evt.Did, err) 156 + return err 202 157 } 203 158 204 - ident, err := r.dir.LookupDID(ctx, did) 205 - if err != nil { 206 - // XXX: handle more granularly (eg, true not-founds vs errors); and add tests 207 - logger.Warn("failed to load identity") 159 + if acc.Status != models.AccountStatusActive || acc.UpstreamStatus != models.AccountStatusActive { 160 + logger.Info("dropping commit message for non-active account", "status", acc.Status, "upstreamStatus", acc.UpstreamStatus) 161 + return nil 208 162 } 209 163 210 164 newRepo, err := r.VerifyRepoSync(ctx, evt, ident, hostname) ··· 212 166 return err 213 167 } 214 168 215 - // TODO: should this happen before or after firehose persist/broadcast? 216 - err = r.UpsertAccountRepo(account.UID, syntax.TID(newRepo.Rev), newRepo.CommitCID, newRepo.CommitData) 169 + err = r.UpsertAccountRepo(acc.UID, syntax.TID(newRepo.Rev), newRepo.CommitCID, newRepo.CommitData) 217 170 if err != nil { 218 - return fmt.Errorf("failed to upsert repo state (uid %d): %w", account.UID, err) 171 + return fmt.Errorf("failed to upsert account repo (%s): %w", acc.DID, err) 219 172 } 220 173 221 - // Broadcast the sync event to all consumers 174 + // emit the event 222 175 evtCopy := *evt 223 176 err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{ 224 177 RepoSync: &evtCopy, 178 + PrivUid: acc.UID, 225 179 }) 226 180 if err != nil { 227 - logger.Error("failed to broadcast sync event", "error", err) 228 - return fmt.Errorf("failed to broadcast sync event: %w", err) 181 + logger.Error("failed to broadcast event", "error", err) 182 + return fmt.Errorf("failed to broadcast #sync event: %w", err) 229 183 } 230 - 231 184 return nil 232 185 } 233 186 234 187 func (r *Relay) processIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity, hostname string, hostID uint64) error { 235 188 logger := r.Logger.With("did", evt.Did, "seq", evt.Seq, "host", hostname, "eventType", "identity") 189 + 190 + // TODO: reduce verbosity? 236 191 logger.Info("relay got identity event") 237 192 238 - did, err := syntax.ParseDID(evt.Did) 193 + acc, _, err := r.preProcessEvent(ctx, evt.Did, hostname, hostID, logger) 239 194 if err != nil { 240 - return fmt.Errorf("invalid DID in message: %w", err) 195 + return err 241 196 } 197 + did := syntax.DID(acc.DID) 242 198 243 - // Flush any cached DID documents for this user 199 + // Flush any cached DID/identity info for this user 244 200 r.dir.Purge(ctx, did.AtIdentifier()) 245 201 if err != nil { 246 202 logger.Error("problem purging identity directory cache", "err", err) 247 203 } 248 204 249 - // XXX: syncHostAccount doesn't need full Host? 250 - host, err := r.GetHost(ctx, hostID) 251 - if err != nil { 252 - return err 253 - } 254 - 255 - // Refetch the DID doc and update our cached keys and handle etc. 256 - account, err := r.syncHostAccount(ctx, did, host, nil) 257 - if err != nil { 258 - return err 259 - } 260 - 261 205 // Broadcast the identity event to all consumers 262 206 err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{ 263 207 RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{ 264 208 Did: did.String(), 265 - Seq: evt.Seq, 266 - Time: evt.Time, 267 - Handle: evt.Handle, 209 + Time: evt.Time, // TODO: update to now? 210 + Handle: evt.Handle, // TODO: we could substitute in our handle resolution here 268 211 }, 269 - PrivUid: account.UID, 212 + PrivUid: acc.UID, 270 213 }) 271 214 if err != nil { 272 - logger.Error("failed to broadcast Identity event", "error", err) 273 - return fmt.Errorf("failed to broadcast Identity event: %w", err) 215 + logger.Error("failed to broadcast identity event", "error", err) 216 + return fmt.Errorf("failed to broadcast #identity event: %w", err) 274 217 } 275 218 276 219 return nil ··· 287 230 attribute.Bool("active", evt.Active), 288 231 ) 289 232 290 - did, err := syntax.ParseDID(evt.Did) 233 + acc, _, err := r.preProcessEvent(ctx, evt.Did, hostname, hostID, logger) 291 234 if err != nil { 292 - return fmt.Errorf("invalid DID in message: %w", err) 235 + return err 293 236 } 294 237 295 238 if evt.Status != nil { ··· 298 241 logger.Info("relay got account event") 299 242 300 243 if !evt.Active && evt.Status == nil { 301 - // TODO: semantics here aren't really clear 302 - logger.Warn("dropping invalid account event", "active", evt.Active, "status", evt.Status) 303 - accountVerifyWarnings.WithLabelValues(hostname, "nostat").Inc() 304 - return nil 244 + // XXX: what should we do here? 245 + logger.Warn("invalid account event", "active", evt.Active, "status", evt.Status) 305 246 } 306 247 307 - // Flush any cached DID documents for this user 308 - r.dir.Purge(ctx, did.AtIdentifier()) 309 - if err != nil { 310 - logger.Error("problem purging identity directory cache", "err", err) 311 - } 312 - 313 - // XXX: shouldn't need full host? 314 - host, err := r.GetHost(ctx, hostID) 315 - if err != nil { 248 + // Process the upstream account status change 249 + if err := r.db.Model(models.Account{}).Where("uid = ?", acc.UID).Update("upstream_status", evt.Status).Error; err != nil { 316 250 return err 317 251 } 318 252 319 - // Refetch the DID doc to make sure the Host is still authoritative 320 - account, err := r.syncHostAccount(ctx, did, host, nil) 321 - if err != nil { 322 - span.RecordError(err) 323 - return err 253 + // wrangle various status codes in to what is expected in account event 254 + publicStatus := acc.Status 255 + if publicStatus == models.AccountStatusActive && evt.Status != nil { 256 + publicStatus = models.AccountStatus(*evt.Status) 324 257 } 325 - 326 - // Check if the Host is still authoritative 327 - // if not we don't want to be propagating this account event 328 - // XXX: lock 329 - if account.HostID != hostID && !r.Config.SkipAccountHostCheck { 330 - logger.Error("account event from non-authoritative pds", 331 - "event_from", hostname, 332 - "did_doc_declared_pds", account.HostID, 333 - "account_evt", evt, 334 - ) 335 - return fmt.Errorf("event from non-authoritative pds") 258 + publicActive := publicStatus == models.AccountStatusActive 259 + ptrStatus := (*string)(&publicStatus) 260 + if publicActive { 261 + ptrStatus = nil 336 262 } 337 263 338 - // Process the account status change 339 - repoStatus := models.AccountStatusActive 340 - if !evt.Active && evt.Status != nil { 341 - repoStatus = models.AccountStatus(*evt.Status) 342 - } 343 - 344 - // XXX: lock, and parse 345 - account.UpstreamStatus = models.AccountStatus(repoStatus) 346 - err = r.db.Save(account).Error 347 - if err != nil { 348 - span.RecordError(err) 349 - return fmt.Errorf("failed to update account status: %w", err) 350 - } 351 - 352 - shouldBeActive := evt.Active 353 - status := evt.Status 354 - 355 - // override with local status 356 - // XXX: lock 357 - if account.Status == "takendown" { 358 - shouldBeActive = false 359 - s := string(models.AccountStatusTakendown) 360 - status = &s 361 - } 362 - 363 - // Broadcast the account event to all consumers 264 + // emit the event 364 265 err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{ 365 266 RepoAccount: &comatproto.SyncSubscribeRepos_Account{ 366 - Active: shouldBeActive, 367 - Did: evt.Did, 368 - Seq: evt.Seq, 369 - Status: status, 267 + Active: publicActive, 268 + Did: acc.DID, 269 + Status: ptrStatus, // TODO: sometimes will be "active" 370 270 Time: evt.Time, 371 271 }, 372 - PrivUid: account.UID, 272 + PrivUid: acc.UID, 373 273 }) 374 274 if err != nil { 375 - logger.Error("failed to broadcast Account event", "error", err) 376 - return fmt.Errorf("failed to broadcast Account event: %w", err) 275 + logger.Error("failed to broadcast event", "error", err) 276 + return fmt.Errorf("failed to broadcast #account event: %w", err) 377 277 } 378 278 379 279 return nil
+11 -31
cmd/rerelay/relay/slurper.go
··· 18 18 "github.com/gorilla/websocket" 19 19 ) 20 20 21 + // TODO: this isn't actually getting setup or used? 22 + var EventsTimeout = time.Minute 23 + 21 24 type ProcessMessageFunc func(ctx context.Context, evt *stream.XRPCStreamEvent, hostname string, hostID uint64) error 22 25 type PersistCursorFunc func(ctx context.Context, cursors *[]HostCursor) error 23 26 type PersistHostStatusFunc func(ctx context.Context, hostID uint64, state models.HostStatus) error ··· 78 81 type Subscription struct { 79 82 Hostname string 80 83 HostID uint64 81 - LastSeq int64 // XXX: switch to an atomic 82 - Limiters *Limiters 84 + LastSeq int64 // XXX: switch to an atomic instead of lock? 85 + Limiters *Limiters // XXX: is this used? or only the separate limiters on Slurper? 83 86 84 87 lk sync.RWMutex 85 88 ctx context.Context ··· 152 155 return s.Limiters[hostID] 153 156 } 154 157 155 - /* 156 - XXX 157 - 158 - func (s *Slurper) GetOrCreateLimiters(pdsID uint64, perSecLimit int64, perHourLimit int64, perDayLimit int64) *Limiters { 159 - s.LimitMtx.RLock() 160 - defer s.LimitMtx.RUnlock() 161 - lim, ok := s.Limiters[pdsID] 162 - if !ok { 163 - perSec, _ := slidingwindow.NewLimiter(time.Second, perSecLimit, windowFunc) 164 - perHour, _ := slidingwindow.NewLimiter(time.Hour, perHourLimit, windowFunc) 165 - perDay, _ := slidingwindow.NewLimiter(time.Hour*24, perDayLimit, windowFunc) 166 - lim = &Limiters{ 167 - PerSecond: perSec, 168 - PerHour: perHour, 169 - PerDay: perDay, 170 - } 171 - s.Limiters[pdsID] = lim 172 - } 173 - 174 - return lim 175 - } 176 - */ 177 158 func (s *Slurper) GetOrCreateLimiters(hostID uint64) *Limiters { 178 159 s.LimitMtx.RLock() 179 160 defer s.LimitMtx.RUnlock() ··· 314 295 curCursor := cursor 315 296 if err := s.handleConnection(ctx, conn, &cursor, sub); err != nil { 316 297 if errors.Is(err, ErrTimeoutShutdown) { 317 - s.logger.Info("shutting down host subscription after timeout", "host", host.Hostname, "time", EventsTimeout) 298 + s.logger.Info("shutting down host subscription after timeout", "host", host.Hostname, "time", EventsTimeout.String()) 318 299 return 319 300 } 320 301 s.logger.Warn("connection to failed", "host", host.Hostname, "err", err) ··· 339 320 return time.Second * 30 340 321 } 341 322 342 - var EventsTimeout = time.Minute 343 - 344 323 func (s *Slurper) handleConnection(ctx context.Context, conn *websocket.Conn, lastCursor *int64, sub *Subscription) error { 345 324 ctx, cancel := context.WithCancel(ctx) 346 325 defer cancel() ··· 465 444 lims.PerDay, 466 445 } 467 446 447 + // NOTE: this is where limiters get injected and enforced 468 448 instrumentedRSC := stream.NewInstrumentedRepoStreamCallbacks(limiters, rsc.EventHandler) 469 449 470 450 pool := parallel.NewScheduler( 471 - 100, 472 - 1_000, 451 + 100, // XXX: concurrency 452 + 1_000, // XXX: max queue per host 473 453 conn.RemoteAddr().String(), 474 454 instrumentedRSC.EventHandler, 475 455 ) ··· 483 463 484 464 // persistCursors sends all cursors to callback to be persisted in database (if registered) 485 465 func (s *Slurper) persistCursors(ctx context.Context) error { 486 - if s.Config.PersistCursorCallback != nil { 466 + if s.Config.PersistCursorCallback == nil { 487 467 s.logger.Warn("skipping cursor persist because no PersistCursorCallback registered") 488 468 return nil 489 469 } ··· 505 485 s.subsLk.Unlock() 506 486 507 487 err := s.Config.PersistCursorCallback(ctx, &cursors) 508 - s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start), "err", err) 488 + s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err) 509 489 return err 510 490 } 511 491
+2 -2
cmd/rerelay/testing/runner.go
··· 140 140 if !EqualEvents(evt, evts[0]) { 141 141 //evt.RepoCommit.Blocks = nil 142 142 //evts[0].RepoCommit.Blocks = nil 143 - fmt.Printf("%+v\n", evt.RepoCommit) 144 - fmt.Printf("%+v\n", evts[0].RepoCommit) 143 + fmt.Printf("%+v\n", evt.RepoAccount) 144 + fmt.Printf("%+v\n", evts[0].RepoAccount) 145 145 return fmt.Errorf("events didn't match") 146 146 } 147 147 } else {