this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fold slurper and validator back in to relay package

+22 -38
+4 -3
cmd/relayered/handlers.go
··· 12 12 13 13 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 14 "github.com/bluesky-social/indigo/cmd/relayered/relay" 15 + "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 15 16 "github.com/bluesky-social/indigo/xrpc" 16 17 17 18 "github.com/labstack/echo/v4" ··· 165 166 } 166 167 167 168 ustatus := u.GetUpstreamStatus() 168 - if ustatus == relay.AccountStatusTakendown { 169 + if ustatus == models.AccountStatusTakendown { 169 170 return nil, fmt.Errorf("account was taken down by its PDS") 170 171 } 171 172 172 - if ustatus == relay.AccountStatusDeactivated { 173 + if ustatus == models.AccountStatusDeactivated { 173 174 return nil, fmt.Errorf("account is temporarily deactivated") 174 175 } 175 176 176 - if ustatus == relay.AccountStatusSuspended { 177 + if ustatus == models.AccountStatusSuspended { 177 178 return nil, fmt.Errorf("account is suspended by its PDS") 178 179 } 179 180
+1 -2
cmd/relayered/main.go
··· 19 19 20 20 "github.com/bluesky-social/indigo/atproto/identity" 21 21 "github.com/bluesky-social/indigo/cmd/relayered/relay" 22 - "github.com/bluesky-social/indigo/cmd/relayered/relay/validator" 23 22 "github.com/bluesky-social/indigo/cmd/relayered/stream/eventmgr" 24 23 "github.com/bluesky-social/indigo/cmd/relayered/stream/persist/diskpersist" 25 24 "github.com/bluesky-social/indigo/util" ··· 250 249 } 251 250 252 251 evtman := eventmgr.NewEventManager(persister) 253 - vldtr := validator.NewValidator(&dir) 252 + vldtr := relay.NewValidator(&dir) 254 253 255 254 logger.Info("constructing relay service") 256 255 r, err := relay.NewRelay(db, vldtr, evtman, &dir, relayConfig)
-13
cmd/relayered/relay/account_status.go
··· 1 - package relay 2 - 3 - var ( 4 - // AccountStatusActive is not in the spec but used internally 5 - AccountStatusActive = "active" 6 - 7 - AccountStatusDeactivated = "deactivated" 8 - AccountStatusDeleted = "deleted" 9 - AccountStatusDesynchronized = "desynchronized" 10 - AccountStatusSuspended = "suspended" 11 - AccountStatusTakendown = "takendown" 12 - AccountStatusThrottled = "throttled" 13 - )
+5 -5
cmd/relayered/relay/firehose.go
··· 110 110 } 111 111 112 112 // Process the account status change 113 - repoStatus := AccountStatusActive 113 + repoStatus := models.AccountStatusActive 114 114 if !env.RepoAccount.Active && env.RepoAccount.Status != nil { 115 115 repoStatus = *env.RepoAccount.Status 116 116 } ··· 128 128 // override with local status 129 129 if account.GetTakenDown() { 130 130 shouldBeActive = false 131 - status = &AccountStatusTakendown 131 + status = &models.AccountStatusTakendown 132 132 } 133 133 134 134 // Broadcast the account event to all consumers ··· 184 184 185 185 ustatus := account.GetUpstreamStatus() 186 186 187 - if account.GetTakenDown() || ustatus == AccountStatusTakendown { 187 + if account.GetTakenDown() || ustatus == models.AccountStatusTakendown { 188 188 r.Logger.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 189 189 repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc() 190 190 return nil 191 191 } 192 192 193 - if ustatus == AccountStatusSuspended { 193 + if ustatus == models.AccountStatusSuspended { 194 194 r.Logger.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 195 195 repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc() 196 196 return nil 197 197 } 198 198 199 - if ustatus == AccountStatusDeactivated { 199 + if ustatus == models.AccountStatusDeactivated { 200 200 r.Logger.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host) 201 201 repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc() 202 202 return nil
+5 -7
cmd/relayered/relay/relay.go
··· 5 5 "sync" 6 6 7 7 "github.com/bluesky-social/indigo/atproto/identity" 8 - "github.com/bluesky-social/indigo/cmd/relayered/relay/slurper" 9 8 "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 10 - "github.com/bluesky-social/indigo/cmd/relayered/relay/validator" 11 9 "github.com/bluesky-social/indigo/cmd/relayered/stream/eventmgr" 12 10 "github.com/bluesky-social/indigo/xrpc" 13 11 ··· 22 20 db *gorm.DB 23 21 dir identity.Directory 24 22 Logger *slog.Logger 25 - Slurper *slurper.Slurper 23 + Slurper *Slurper 26 24 Events *eventmgr.EventManager 27 - Validator *validator.Validator 25 + Validator *Validator 28 26 Config RelayConfig 29 27 30 28 // extUserLk serializes a section of syncPDSAccount() ··· 59 57 } 60 58 } 61 59 62 - func NewRelay(db *gorm.DB, vldtr *validator.Validator, evtman *eventmgr.EventManager, dir identity.Directory, config *RelayConfig) (*Relay, error) { 60 + func NewRelay(db *gorm.DB, vldtr *Validator, evtman *eventmgr.EventManager, dir identity.Directory, config *RelayConfig) (*Relay, error) { 63 61 64 62 if config == nil { 65 63 config = DefaultRelayConfig() ··· 85 83 return nil, err 86 84 } 87 85 88 - slOpts := slurper.DefaultSlurperConfig() 86 + slOpts := DefaultSlurperConfig() 89 87 slOpts.SSL = config.SSL 90 88 slOpts.DefaultRepoLimit = config.DefaultRepoLimit 91 89 slOpts.ConcurrencyPerPDS = config.ConcurrencyPerPDS 92 90 slOpts.MaxQueuePerPDS = config.MaxQueuePerPDS 93 - s, err := slurper.NewSlurper(db, r.handleFedEvent, slOpts, r.Logger) 91 + s, err := NewSlurper(db, r.handleFedEvent, slOpts, r.Logger) 94 92 if err != nil { 95 93 return nil, err 96 94 }
+1 -1
cmd/relayered/relay/slurper/metrics.go cmd/relayered/relay/metrics_slurper.go
··· 1 - package slurper 1 + package relay 2 2 3 3 import ( 4 4 "github.com/prometheus/client_golang/prometheus"
+1 -1
cmd/relayered/relay/slurper/rate_limits.go cmd/relayered/relay/rate_limits.go
··· 1 - package slurper 1 + package relay 2 2 3 3 type PDSRates struct { 4 4 // core event rate, counts firehose events
+2 -2
cmd/relayered/relay/slurper/slurper.go cmd/relayered/relay/slurper.go
··· 1 - package slurper 1 + package relay 2 2 3 3 import ( 4 4 "context" ··· 12 12 13 13 "github.com/RussellLuo/slidingwindow" 14 14 comatproto "github.com/bluesky-social/indigo/api/atproto" 15 - "github.com/bluesky-social/indigo/cmd/relayered/stream" 16 15 "github.com/bluesky-social/indigo/cmd/relayered/relay/models" 16 + "github.com/bluesky-social/indigo/cmd/relayered/stream" 17 17 "github.com/bluesky-social/indigo/cmd/relayered/stream/schedulers/parallel" 18 18 19 19 "github.com/gorilla/websocket"
+1 -1
cmd/relayered/relay/validator/metrics.go cmd/relayered/relay/metrics_validator.go
··· 1 - package validator 1 + package relay 2 2 3 3 import ( 4 4 "github.com/prometheus/client_golang/prometheus"
+1 -1
cmd/relayered/relay/validator/validator.go cmd/relayered/relay/validator.go
··· 1 - package validator 1 + package relay 2 2 3 3 import ( 4 4 "bytes"
+1 -2
cmd/relayered/testing/runner.go
··· 12 12 13 13 "github.com/bluesky-social/indigo/atproto/identity" 14 14 "github.com/bluesky-social/indigo/cmd/relayered/relay" 15 - "github.com/bluesky-social/indigo/cmd/relayered/relay/validator" 16 15 "github.com/bluesky-social/indigo/cmd/relayered/stream" 17 16 "github.com/bluesky-social/indigo/cmd/relayered/stream/eventmgr" 18 17 "github.com/bluesky-social/indigo/cmd/relayered/stream/persist/diskpersist" ··· 43 42 if err != nil { 44 43 panic(err) 45 44 } 46 - vldtr := validator.NewValidator(dir) 45 + vldtr := relay.NewValidator(dir) 47 46 evtman := eventmgr.NewEventManager(persister) 48 47 49 48 r, err := relay.NewRelay(db, vldtr, evtman, dir, relayConfig)