this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

move some db access into relay pkg

+49 -18
+7 -10
cmd/relayered/handlers.go
··· 107 107 } 108 108 109 109 func (s *Service) handleComAtprotoSyncListRepos(ctx context.Context, cursor int64, limit int) (*comatproto.SyncListRepos_Output, error) { 110 - // Load the accounts 111 - accounts := []*slurper.Account{} 112 - if err := s.db.Model(&slurper.Account{}).Where("id > ? AND NOT taken_down AND (upstream_status IS NULL OR upstream_status = 'active')", cursor).Order("id").Limit(limit).Find(&accounts).Error; err != nil { 110 + accounts, err := s.relay.ListAccounts(ctx, cursor, limit) 111 + if err != nil { 113 112 if err == gorm.ErrRecordNotFound { 114 113 return &comatproto.SyncListRepos_Output{}, nil 115 114 } ··· 179 178 return nil, fmt.Errorf("account is suspended by its PDS") 180 179 } 181 180 182 - var prevState slurper.AccountPreviousState 183 - err = s.db.First(&prevState, u.ID).Error 184 - if err == nil { 185 - // okay! 186 - } else if errors.Is(err, gorm.ErrRecordNotFound) { 187 - return nil, relay.ErrUserStatusUnavailable 188 - } else { 181 + prevState, err := s.relay.GetAccountPreviousState(ctx, u.ID) 182 + if err != nil { 183 + if errors.Is(err, gorm.ErrRecordNotFound) { 184 + return nil, relay.ErrUserStatusUnavailable 185 + } 189 186 s.log.Error("user db err", "err", err) 190 187 return nil, fmt.Errorf("user prev db err, %w", err) 191 188 }
-5
cmd/relayered/handlers_admin.go
··· 178 178 UserCount int64 `json:"UserCount"` 179 179 } 180 180 181 - type UserCount struct { 182 - PDSID uint `gorm:"column:pds"` 183 - UserCount int64 `gorm:"column:user_count"` 184 - } 185 - 186 181 func (svc *Service) handleListPDSs(e echo.Context) error { 187 182 var pds []slurper.PDS 188 183 if err := svc.db.Find(&pds).Error; err != nil {
+31
cmd/relayered/relay/account.go
··· 290 290 return nil 291 291 } 292 292 293 + func (r *Relay) GetAccountPreviousState(ctx context.Context, uid models.Uid) (*slurper.AccountPreviousState, error) { 294 + var prevState slurper.AccountPreviousState 295 + if err := r.db.First(&prevState, uid).Error; err != nil { 296 + if errors.Is(err, gorm.ErrRecordNotFound) { 297 + return nil, ErrUserStatusUnavailable 298 + } 299 + r.Logger.Error("user db err", "err", err) 300 + return nil, err 301 + } 302 + return &prevState, nil 303 + } 304 + 293 305 func (r *Relay) GetRepoRoot(ctx context.Context, user models.Uid) (cid.Cid, error) { 294 306 var prevState slurper.AccountPreviousState 295 307 err := r.db.First(&prevState, user).Error ··· 302 314 return cid.Cid{}, fmt.Errorf("user prev db err, %w", err) 303 315 } 304 316 } 317 + 318 + func (r *Relay) GetHostForDID(ctx context.Context, did string) (string, error) { 319 + var pdsHostname string 320 + // TODO: use gorm, not "Raw" 321 + err := r.db.Raw("SELECT pds.host FROM users JOIN pds ON users.pds = pds.id WHERE users.did = ?", did).Scan(&pdsHostname).Error 322 + if err != nil { 323 + return "", err 324 + } 325 + return pdsHostname, nil 326 + } 327 + 328 + func (r *Relay) ListAccounts(ctx context.Context, cursor int64, limit int) ([]*slurper.Account, error) { 329 + 330 + accounts := []*slurper.Account{} 331 + if err := r.db.Model(&slurper.Account{}).Where("id > ? AND NOT taken_down AND (upstream_status IS NULL OR upstream_status = 'active')", cursor).Order("id").Limit(limit).Find(&accounts).Error; err != nil { 332 + return nil, err 333 + } 334 + return accounts, nil 335 + }
+5
cmd/relayered/relay/relay.go
··· 122 122 return nil 123 123 } 124 124 125 + // simple check of connection to database 126 + func (r *Relay) Healthcheck() error { 127 + return r.db.Exec("SELECT 1").Error 128 + } 129 + 125 130 type SocketConsumer struct { 126 131 UserAgent string 127 132 RemoteAddr string
+1 -1
cmd/relayered/service.go
··· 218 218 } 219 219 220 220 func (svc *Service) HandleHealthCheck(c echo.Context) error { 221 - if err := svc.db.Exec("SELECT 1").Error; err != nil { 221 + if err := svc.relay.Healthcheck(); err != nil { 222 222 svc.log.Error("healthcheck can't connect to database", "err", err) 223 223 return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"}) 224 224 } else {
+5 -2
cmd/relayered/stubs.go
··· 83 83 // HandleComAtprotoSyncGetRepo handles /xrpc/com.atproto.sync.getRepo 84 84 // returns 3xx to same URL at source PDS 85 85 func (s *Service) HandleComAtprotoSyncGetRepo(c echo.Context) error { 86 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRepo") 87 + defer span.End() 86 88 // no request object, only params 87 89 params := c.QueryParams() 88 90 var did string ··· 106 108 return c.JSON(http.StatusBadRequest, XRPCError{Message: "need did param"}) 107 109 } 108 110 109 - var pdsHostname string 110 - err := s.db.Raw("SELECT pds.host FROM users JOIN pds ON users.pds = pds.id WHERE users.did = ?", did).Scan(&pdsHostname).Error 111 + pdsHostname, err := s.relay.GetHostForDID(ctx, did) 112 + 113 + // TODO: proper error responses 111 114 if err != nil { 112 115 if errors.Is(err, gorm.ErrRecordNotFound) { 113 116 return c.JSON(http.StatusNotFound, XRPCError{Message: "NULL"})