this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor handlers and stubs

+300 -184
+134 -77
cmd/rerelay/handlers.go
··· 1 1 package main 2 2 3 3 import ( 4 - "bytes" 5 - "context" 6 - "encoding/json" 7 4 "errors" 8 5 "fmt" 9 6 "net/http" 10 - "net/url" 11 7 "strings" 12 8 13 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 14 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/bluesky-social/indigo/cmd/rerelay/relay" 15 12 "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 16 13 "github.com/bluesky-social/indigo/xrpc" 17 14 18 15 "github.com/labstack/echo/v4" 19 - "gorm.io/gorm" 20 16 ) 21 17 22 - func (s *Service) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatproto.SyncRequestCrawl_Input) error { 23 - host := body.Hostname 24 - if host == "" { 25 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") 26 - } 18 + func (s *Service) handleComAtprotoSyncRequestCrawl(c echo.Context, body *comatproto.SyncRequestCrawl_Input) error { 19 + ctx := c.Request().Context() 27 20 28 - if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 29 - if s.relay.Config.SSL { 30 - host = "https://" + host 31 - } else { 32 - host = "http://" + host 33 - } 34 - } 35 - 36 - u, err := url.Parse(host) 21 + hostname, noSSL, err := relay.ParseHostname(body.Hostname) 37 22 if err != nil { 38 - return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 23 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("hostname field empty or invalid: %s", body.Hostname)}) 39 24 } 40 25 41 - if u.Scheme == "http" && s.relay.Config.SSL { 42 - return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 26 + if noSSL && !s.relay.Config.SSL { 27 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "This relay requires SSL"}) 43 28 } 44 29 45 - if u.Scheme == "https" && !s.relay.Config.SSL { 46 - return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") 47 - } 48 - 49 - if u.Path != "" { 50 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") 51 - } 52 - 53 - if u.Query().Encode() != "" { 54 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") 55 - } 30 + // TODO: could ensure that query and path are empty 56 31 57 - host = u.Host // potentially hostname:port 32 + // XXX: config if new PDS instances are allowed at all 58 33 59 - banned, err := s.relay.DomainIsBanned(ctx, host) 60 - if banned { 61 - return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned") 62 - } 63 - 64 - s.logger.Warn("XXX: better host validation for crawl requests") 65 - 66 - clientHost := fmt.Sprintf("%s://%s", u.Scheme, host) 67 - 68 - c := &xrpc.Client{ 69 - Host: clientHost, 70 - Client: http.DefaultClient, // not using the client that auto-retries 34 + if strings.HasPrefix(hostname, "localhost:") { 35 + // XXX: config if localhost connections allowed 36 + } else { 37 + banned, err := s.relay.DomainIsBanned(ctx, hostname) 38 + if err != nil { 39 + return nil 40 + } 41 + if banned { 42 + return c.JSON(http.StatusUnauthorized, xrpc.XRPCError{ErrStr: "DomainBan", Message: "host domain is banned"}) 43 + } 71 44 } 72 45 73 - desc, err := comatproto.ServerDescribeServer(ctx, c) 74 - if err != nil { 75 - errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost) 76 - return echo.NewHTTPError(http.StatusBadRequest, errMsg) 46 + if err := s.relay.HostChecker.CheckHost(ctx, hostname); err != nil { 47 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "HostNotFound", Message: fmt.Sprintf("host server unreachable: %s", err)}) 77 48 } 78 49 79 - // Maybe we could do something with this response later 80 - _ = desc 81 - 50 + /* XXX: forwarding requestCrawl should be handled by rainbow, not relay itself 82 51 if len(s.config.NextCrawlers) != 0 { 83 52 blob, err := json.Marshal(body) 84 53 if err != nil { ··· 102 71 }(blob) 103 72 } 104 73 } 74 + */ 105 75 106 - return s.relay.SubscribeToHost(host, true, false, nil) 76 + return s.relay.SubscribeToHost(hostname, noSSL, false) 77 + } 78 + 79 + func (s *Service) handleComAtprotoSyncListHosts(c echo.Context, cursor int64, limit int) (*comatproto.SyncListHosts_Output, error) { 80 + ctx := c.Request().Context() 81 + 82 + hosts, err := s.relay.ListHosts(ctx, cursor, limit) 83 + if err != nil { 84 + return nil, c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to list hosts"}) 85 + } 86 + 87 + if len(hosts) == 0 { 88 + // resp.Hosts is an explicit empty array, not just 'nil' 89 + return &comatproto.SyncListHosts_Output{ 90 + Hosts: []*comatproto.SyncListHosts_Host{}, 91 + }, nil 92 + } 93 + 94 + resp := &comatproto.SyncListHosts_Output{ 95 + Hosts: make([]*comatproto.SyncListHosts_Host, len(hosts)), 96 + } 97 + 98 + for i, host := range hosts { 99 + resp.Hosts[i] = &comatproto.SyncListHosts_Host{ 100 + // TODO: AccountCount 101 + Hostname: host.Hostname, 102 + Seq: &host.LastSeq, 103 + Status: (*string)(&host.Status), 104 + } 105 + } 106 + 107 + // If this is not the last page, set the cursor 108 + if len(hosts) >= limit && len(hosts) > 1 { 109 + nextCursor := fmt.Sprintf("%d", hosts[len(hosts)-1].ID) 110 + resp.Cursor = &nextCursor 111 + } 112 + 113 + return resp, nil 114 + } 115 + 116 + func (s *Service) handleComAtprotoSyncGetHostStatus(c echo.Context, hostname string) (*comatproto.SyncGetHostStatus_Output, error) { 117 + ctx := c.Request().Context() 118 + 119 + host, err := s.relay.GetHost(ctx, hostname) 120 + if err != nil { 121 + if errors.Is(err, relay.ErrHostNotFound) { 122 + // TODO: test that not found DID is a 404 123 + return nil, c.JSON(http.StatusNotFound, xrpc.XRPCError{ErrStr: "HostNotFound", Message: "host not found"}) 124 + } 125 + return nil, c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "looking up host information"}) 126 + } 127 + 128 + out := &comatproto.SyncGetHostStatus_Output{ 129 + // TODO: AccountCount 130 + Hostname: host.Hostname, 131 + Seq: &host.LastSeq, 132 + Status: (*string)(&host.Status), 133 + } 134 + 135 + return out, nil 107 136 } 108 137 109 - func (s *Service) handleComAtprotoSyncListRepos(ctx context.Context, cursor int64, limit int) (*comatproto.SyncListRepos_Output, error) { 138 + func (s *Service) handleComAtprotoSyncListRepos(c echo.Context, cursor int64, limit int) (*comatproto.SyncListRepos_Output, error) { 139 + ctx := c.Request().Context() 140 + 141 + // XXX: document that ListAccounts is ordered by UID (ascending) 110 142 accounts, err := s.relay.ListAccounts(ctx, cursor, limit) 111 143 if err != nil { 112 - if err == gorm.ErrRecordNotFound { 113 - return &comatproto.SyncListRepos_Output{}, nil 114 - } 115 144 s.logger.Error("failed to query accounts", "err", err) 116 - return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to query accounts") 145 + return nil, c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to list accounts (repos)"}) 117 146 } 118 147 119 148 if len(accounts) == 0 { ··· 128 157 } 129 158 130 159 // Fetch the repo roots for each user 160 + // TODO: would be much more efficient to do a join and have Relay.ListAccounts return these repos with the account info 131 161 for i, acc := range accounts { 132 162 repo, err := s.relay.GetAccountRepo(ctx, acc.UID) 133 163 if err != nil { ··· 150 180 return resp, nil 151 181 } 152 182 153 - func (s *Service) handleComAtprotoSyncGetLatestCommit(ctx context.Context, rawDID string) (*comatproto.SyncGetLatestCommit_Output, error) { 154 - did, err := syntax.ParseDID(rawDID) 155 - if err != nil { 156 - return nil, fmt.Errorf("invalid DID parameter: %w", err) 157 - } 183 + func (s *Service) handleComAtprotoSyncGetRepoStatus(c echo.Context, did syntax.DID) (*comatproto.SyncGetRepoStatus_Output, error) { 184 + ctx := c.Request().Context() 185 + 158 186 acc, err := s.relay.GetAccount(ctx, did) 159 187 if err != nil { 160 - if errors.Is(err, gorm.ErrRecordNotFound) { 161 - return nil, echo.NewHTTPError(http.StatusNotFound, "user not found") 188 + if errors.Is(err, relay.ErrAccountNotFound) { 189 + // TODO: test that not found DID is a 404 190 + return nil, c.JSON(http.StatusNotFound, xrpc.XRPCError{ErrStr: "RepoNotFound", Message: "account not found"}) 162 191 } 163 - return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user") 192 + return nil, c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "looking up account information"}) 164 193 } 165 194 166 - if acc.Status == models.AccountStatusTakendown { 167 - return nil, fmt.Errorf("account was taken down by the Relay") 195 + out := &comatproto.SyncGetRepoStatus_Output{ 196 + Did: did.String(), 197 + Active: acc.IsActive(), 198 + Status: acc.StatusField(), 168 199 } 169 200 170 - if acc.UpstreamStatus == models.AccountStatusTakendown { 171 - return nil, fmt.Errorf("account was taken down by its PDS") 201 + repo, err := s.relay.GetAccountRepo(ctx, acc.UID) 202 + if err != nil && !errors.Is(err, relay.ErrAccountRepoNotFound) { 203 + return nil, err 172 204 } 173 205 174 - if acc.Status == models.AccountStatusDeactivated { 175 - return nil, fmt.Errorf("account is temporarily deactivated") 206 + out.Rev = &repo.Rev 207 + 208 + return out, nil 209 + } 210 + 211 + func (s *Service) handleComAtprotoSyncGetLatestCommit(c echo.Context, did syntax.DID) (*comatproto.SyncGetLatestCommit_Output, error) { 212 + ctx := c.Request().Context() 213 + 214 + acc, err := s.relay.GetAccount(ctx, did) 215 + if err != nil { 216 + if errors.Is(err, relay.ErrAccountNotFound) { 217 + // TODO: test that not found DID is a 404 218 + return nil, c.JSON(http.StatusNotFound, xrpc.XRPCError{ErrStr: "RepoNotFound", Message: "account not found"}) 219 + } 220 + return nil, c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "looking up account information"}) 176 221 } 177 222 178 - if acc.Status == models.AccountStatusSuspended { 179 - return nil, fmt.Errorf("account is suspended by its PDS") 223 + switch acc.AccountStatus() { 224 + case models.AccountStatusTakendown, models.AccountStatusSuspended: 225 + return nil, c.JSON(http.StatusForbidden, xrpc.XRPCError{ErrStr: "RepoTakendown", Message: "account not active (takendown)"}) 226 + case models.AccountStatusDeactivated: 227 + return nil, c.JSON(http.StatusForbidden, xrpc.XRPCError{ErrStr: "RepoDeactivated", Message: "account not active (deactivated)"}) 228 + case models.AccountStatusDeleted: 229 + return nil, c.JSON(http.StatusForbidden, xrpc.XRPCError{ErrStr: "RepoDeleted", Message: "account not active (deleted)"}) 230 + case models.AccountStatusActive: 231 + // pass 232 + default: 233 + return nil, c.JSON(http.StatusForbidden, xrpc.XRPCError{ErrStr: "RepoInactive", Message: fmt.Sprintf("account not active: %s", acc.AccountStatus())}) 180 234 } 181 235 182 236 repo, err := s.relay.GetAccountRepo(ctx, acc.UID) 183 237 if err != nil { 238 + if errors.Is(err, relay.ErrAccountRepoNotFound) { 239 + // XXX: return partial result? some special error? desynchronized? 240 + } 184 241 return nil, err 185 242 } 186 243 ··· 198 255 func (svc *Service) HandleHealthCheck(c echo.Context) error { 199 256 if err := svc.relay.Healthcheck(); err != nil { 200 257 svc.logger.Error("healthcheck can't connect to database", "err", err) 201 - return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"}) 258 + return c.JSON(http.StatusInternalServerError, HealthStatus{Status: "error", Message: "can't connect to database"}) 202 259 } else { 203 - return c.JSON(200, HealthStatus{Status: "ok"}) 260 + return c.JSON(http.StatusOK, HealthStatus{Status: "ok"}) 204 261 } 205 262 } 206 263
+3 -2
cmd/rerelay/relay/account.go
··· 61 61 //newUsersDiscovered.Inc() 62 62 //start := time.Now() 63 63 64 - ident, err := r.dir.LookupDID(ctx, did) 64 + ident, err := r.Dir.LookupDID(ctx, did) 65 65 if err != nil { 66 66 return nil, fmt.Errorf("new account identity resolution: %w", err) 67 67 } ··· 124 124 return nil 125 125 } 126 126 127 - ident, err := r.dir.LookupDID(ctx, did) 127 + ident, err := r.Dir.LookupDID(ctx, did) 128 128 if err != nil { 129 129 return fmt.Errorf("account identity resolution: %w", err) 130 130 } ··· 195 195 196 196 func (r *Relay) ListAccounts(ctx context.Context, cursor int64, limit int) ([]*models.Account, error) { 197 197 198 + // XXX: what status filter should be in place here? not deleted in addition to not takendown? 198 199 accounts := []*models.Account{} 199 200 if err := r.db.Model(&models.Account{}).Where("uid > ? AND status IS NOT 'takendown' AND (upstream_status IS NULL OR upstream_status = 'active')", cursor).Order("uid").Limit(limit).Find(&accounts).Error; err != nil { 200 201 return nil, err
+6 -5
cmd/rerelay/relay/crawl.go
··· 34 34 return !r.Config.DisableNewHosts 35 35 } 36 36 37 - func (r *Relay) SubscribeToHost(hostname string, reg bool, adminOverride bool, rateOverrides *HostRates) error { 37 + func (r *Relay) SubscribeToHost(hostname string, noSSL, adminForce bool) error { 38 38 39 39 // if we already have an active subscription going, exit early 40 40 if r.Slurper.CheckIfSubscribed(hostname) { 41 41 return nil 42 42 } 43 43 44 + // XXX: new PDS daily rate-limit 45 + 46 + newHost := false 44 47 var host models.Host 45 48 if err := r.db.Find(&host, "hostname = ?", hostname).Error; err != nil { 46 49 return err 47 50 } 48 51 49 - newHost := false 50 - 51 52 if host.ID == 0 { 52 - if !adminOverride && !r.canSlurpHost(hostname) { 53 + if !adminForce && !r.canSlurpHost(hostname) { 53 54 return ErrNewSubsDisabled 54 55 } 55 56 // New PDS! 56 57 npds := models.Host{ 57 58 Hostname: hostname, 58 - NoSSL: !r.Config.SSL, 59 + NoSSL: noSSL, 59 60 Status: models.HostStatusActive, 60 61 AccountLimit: r.Config.DefaultRepoLimit, 61 62 }
+32 -4
cmd/rerelay/relay/host.go
··· 13 13 "gorm.io/gorm" 14 14 ) 15 15 16 - // XXX: GetHost (by hostname) vs GetHostByID 17 - 18 - func (r *Relay) GetHost(ctx context.Context, hostID uint64) (*models.Host, error) { 16 + func (r *Relay) GetHost(ctx context.Context, hostname string) (*models.Host, error) { 19 17 ctx, span := tracer.Start(ctx, "getHost") 20 18 defer span.End() 21 19 22 20 var host models.Host 21 + if err := r.db.Model(models.Host{}).Where("hostname = ?", hostname).First(&host).Error; err != nil { 22 + if errors.Is(err, gorm.ErrRecordNotFound) { 23 + return nil, ErrHostNotFound 24 + } 25 + return nil, err 26 + } 27 + 28 + // TODO: is this further check needed? 29 + if host.ID == 0 { 30 + return nil, ErrHostNotFound 31 + } 32 + 33 + return &host, nil 34 + } 35 + 36 + func (r *Relay) GetHostByID(ctx context.Context, hostID uint64) (*models.Host, error) { 37 + ctx, span := tracer.Start(ctx, "getHostByID") 38 + defer span.End() 39 + 40 + var host models.Host 23 41 if err := r.db.Find(&host, hostID).Error; err != nil { 24 42 if errors.Is(err, gorm.ErrRecordNotFound) { 25 43 return nil, ErrHostNotFound ··· 29 47 30 48 // TODO: is this further check needed? 31 49 if host.ID == 0 { 32 - return nil, ErrAccountNotFound 50 + return nil, ErrHostNotFound 33 51 } 34 52 35 53 return &host, nil 54 + } 55 + 56 + func (r *Relay) ListHosts(ctx context.Context, cursor int64, limit int) ([]*models.Host, error) { 57 + 58 + // TODO: filter based on active status? 59 + hosts := []*models.Host{} 60 + if err := r.db.Model(&models.Host{}).Where("id > ?", cursor).Order("id").Limit(limit).Find(&hosts).Error; err != nil { 61 + return nil, err 62 + } 63 + return hosts, nil 36 64 } 37 65 38 66 func (r *Relay) UpdateHostStatus(ctx context.Context, hostID uint64, status models.HostStatus) error {
+2
cmd/rerelay/relay/host_checker.go
··· 22 22 FetchAccountStatus(ctx context.Context, ident *identity.Identity) (string, error) 23 23 } 24 24 25 + var _ HostChecker = (*HostClient)(nil) 26 + 25 27 type HostClient struct { 26 28 Client *http.Client 27 29 UserAgent string
+2 -2
cmd/rerelay/relay/ingest.go
··· 93 93 return acc, nil, nil 94 94 } 95 95 96 - ident, err := r.dir.LookupDID(ctx, did) 96 + ident, err := r.Dir.LookupDID(ctx, did) 97 97 if err != nil { 98 98 // XXX: handle more granularly (eg, true NotFound vs other errors); and add tests 99 99 logger.Warn("failed to load identity") ··· 197 197 did := syntax.DID(acc.DID) 198 198 199 199 // Flush any cached DID/identity info for this user 200 - r.dir.Purge(ctx, did.AtIdentifier()) 200 + r.Dir.Purge(ctx, did.AtIdentifier()) 201 201 if err != nil { 202 202 logger.Error("problem purging identity directory cache", "err", err) 203 203 }
+2 -2
cmd/rerelay/relay/relay.go
··· 17 17 18 18 type Relay struct { 19 19 db *gorm.DB 20 - dir identity.Directory 20 + Dir identity.Directory 21 21 Logger *slog.Logger 22 22 Slurper *Slurper 23 23 Events *eventmgr.EventManager ··· 70 70 71 71 r := &Relay{ 72 72 db: db, 73 - dir: dir, 73 + Dir: dir, 74 74 Logger: slog.Default().With("system", "relay"), 75 75 Events: evtman, 76 76 HostChecker: hc,
+3 -1
cmd/rerelay/service.go
··· 140 140 e.GET("/xrpc/_health", svc.HandleHealthCheck) 141 141 142 142 e.GET("/xrpc/com.atproto.sync.subscribeRepos", svc.HandleComAtprotoSyncSubscribeRepos) 143 - 144 143 e.POST("/xrpc/com.atproto.sync.requestCrawl", svc.HandleComAtprotoSyncRequestCrawl) 144 + e.GET("/xrpc/com.atproto.sync.listHosts", svc.HandleComAtprotoSyncListHosts) 145 + e.GET("/xrpc/com.atproto.sync.getHostStatus", svc.HandleComAtprotoSyncGetHostStatus) 145 146 e.GET("/xrpc/com.atproto.sync.listRepos", svc.HandleComAtprotoSyncListRepos) 146 147 e.GET("/xrpc/com.atproto.sync.getRepo", svc.HandleComAtprotoSyncGetRepo) // just returns 3xx redirect to source PDS 148 + e.GET("/xrpc/com.atproto.sync.getRepoStatus", svc.HandleComAtprotoSyncGetRepoStatus) 147 149 e.GET("/xrpc/com.atproto.sync.getLatestCommit", svc.HandleComAtprotoSyncGetLatestCommit) 148 150 149 151 /* XXX: disabled while refactoring
+115 -90
cmd/rerelay/stubs.go
··· 1 1 package main 2 2 3 3 import ( 4 - "errors" 5 4 "fmt" 6 - "gorm.io/gorm" 7 5 "net/http" 8 6 "strconv" 9 7 10 - comatprototypes "github.com/bluesky-social/indigo/api/atproto" 8 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 + "github.com/bluesky-social/indigo/cmd/rerelay/relay" 11 + "github.com/bluesky-social/indigo/xrpc" 12 + 12 13 "github.com/labstack/echo/v4" 13 14 "go.opentelemetry.io/otel" 14 15 ) 15 16 16 - type XRPCError struct { 17 - Message string `json:"message"` 17 + func (s *Service) HandleComAtprotoSyncSubscribeRepos(c echo.Context) error { 18 + 19 + cursorQuery := c.QueryParam("cursor") 20 + 21 + var cursor *int64 22 + if cursorQuery != "" { 23 + cval, err := strconv.ParseInt(cursorQuery, 10, 64) 24 + if err != nil || cval < 0 { 25 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("cursor parameter invalid: %s", cursorQuery)}) 26 + } 27 + cursor = &cval 28 + } 29 + 30 + // pass off HTTP connection to the WebSocket handler 31 + return s.relay.HandleSubscribeRepos(c.Response(), c.Request(), cursor, c.RealIP()) 18 32 } 19 33 20 - func (s *Service) RegisterHandlersAppBsky(e *echo.Echo) error { 21 - return nil 34 + func (s *Service) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { 35 + _, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncRequestCrawl") 36 + defer span.End() 37 + 38 + var body comatproto.SyncRequestCrawl_Input 39 + if err := c.Bind(&body); err != nil { 40 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("invalid body: %s", err)}) 41 + } 42 + 43 + // func (s *Service) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatproto.SyncRequestCrawl_Input) error 44 + return s.handleComAtprotoSyncRequestCrawl(c, &body) 22 45 } 23 46 24 - func (s *Service) RegisterHandlersComAtproto(e *echo.Echo) error { 25 - e.GET("/xrpc/com.atproto.sync.getLatestCommit", s.HandleComAtprotoSyncGetLatestCommit) 26 - e.GET("/xrpc/com.atproto.sync.listRepos", s.HandleComAtprotoSyncListRepos) 27 - e.POST("/xrpc/com.atproto.sync.requestCrawl", s.HandleComAtprotoSyncRequestCrawl) 28 - return nil 29 - } 47 + func (s *Service) HandleComAtprotoSyncListHosts(c echo.Context) error { 48 + _, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListHosts") 49 + defer span.End() 50 + 51 + cursorQuery := c.QueryParam("cursor") 52 + limitQuery := c.QueryParam("limit") 53 + 54 + var err error 55 + 56 + // TODO: verify limits against lexicon 57 + limit := 200 58 + if limitQuery != "" { 59 + limit, err = strconv.Atoi(limitQuery) 60 + if err != nil || limit < 1 || limit > 1000 { 61 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("limit parameter invalid or out of range: %s", limitQuery)}) 62 + } 63 + } 30 64 31 - func (s *Service) HandleComAtprotoSyncSubscribeRepos(c echo.Context) error { 32 - var since *int64 33 - if sinceVal := c.QueryParam("cursor"); sinceVal != "" { 34 - sval, err := strconv.ParseInt(sinceVal, 10, 64) 35 - if err != nil { 36 - return err 65 + cursor := int64(0) 66 + if cursorQuery != "" { 67 + cursor, err = strconv.ParseInt(cursorQuery, 10, 64) 68 + if err != nil || cursor < 0 { 69 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("cursor parameter invalid: %s", cursorQuery)}) 37 70 } 38 - since = &sval 71 + } 72 + 73 + out, handleErr := s.handleComAtprotoSyncListHosts(c, cursor, limit) 74 + if handleErr != nil { 75 + return handleErr 39 76 } 40 - return s.relay.HandleSubscribeRepos(c.Response(), c.Request(), since, c.RealIP()) 77 + return c.JSON(200, out) 41 78 } 42 79 43 - func (s *Service) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error { 44 - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit") 80 + func (s *Service) HandleComAtprotoSyncGetHostStatus(c echo.Context) error { 81 + _, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetHostStatus") 45 82 defer span.End() 46 - did := c.QueryParam("did") 47 83 48 - _, err := syntax.ParseDID(did) 49 - if err != nil { 50 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid did: %s", did)}) 51 - } 84 + hostnameQuery := c.QueryParam("hostname") 52 85 53 - var out *comatprototypes.SyncGetLatestCommit_Output 54 - var handleErr error 55 - // func (s *Service) handleComAtprotoSyncGetLatestCommit(ctx context.Context,did string) (*comatprototypes.SyncGetLatestCommit_Output, error) 56 - out, handleErr = s.handleComAtprotoSyncGetLatestCommit(ctx, did) 86 + out, handleErr := s.handleComAtprotoSyncGetHostStatus(c, hostnameQuery) 57 87 if handleErr != nil { 58 88 return handleErr 59 89 } ··· 61 91 } 62 92 63 93 func (s *Service) HandleComAtprotoSyncListRepos(c echo.Context) error { 64 - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos") 94 + _, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncListRepos") 65 95 defer span.End() 66 96 67 97 cursorQuery := c.QueryParam("cursor") ··· 73 103 if limitQuery != "" { 74 104 limit, err = strconv.Atoi(limitQuery) 75 105 if err != nil || limit < 1 || limit > 1000 { 76 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid limit: %s", limitQuery)}) 106 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("limit parameter invalid: %s", limitQuery)}) 77 107 } 78 108 } 79 109 ··· 81 111 if cursorQuery != "" { 82 112 cursor, err = strconv.ParseInt(cursorQuery, 10, 64) 83 113 if err != nil || cursor < 0 { 84 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid cursor: %s", cursorQuery)}) 114 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("limit parameter invalid cursor: %s", cursorQuery)}) 85 115 } 86 116 } 87 117 88 - out, handleErr := s.handleComAtprotoSyncListRepos(ctx, cursor, limit) 118 + out, handleErr := s.handleComAtprotoSyncListRepos(c, cursor, limit) 89 119 if handleErr != nil { 90 120 return handleErr 91 121 } 92 122 return c.JSON(200, out) 93 123 } 94 124 95 - // HandleComAtprotoSyncGetRepo handles /xrpc/com.atproto.sync.getRepo 96 - // returns 3xx to same URL at source PDS 125 + // does a simple HTTP redirect to getRepo on the account's PDS. 126 + // 127 + // NOTE: currently does not check account status locally; a takendown account will still redirect. this saves a database lookup. 97 128 func (s *Service) HandleComAtprotoSyncGetRepo(c echo.Context) error { 98 129 ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRepo") 99 130 defer span.End() 100 - // XXX: this is not how to fetch query params... 101 - // no request object, only params 102 - params := c.QueryParams() 103 - var did syntax.DID 104 - hasDid := false 105 - for paramName, pvl := range params { 106 - switch paramName { 107 - case "did": 108 - if len(pvl) == 1 { 109 - d, err := syntax.ParseDID(pvl[0]) 110 - if err != nil { 111 - return err // XXX: better error 112 - } 113 - did = d 114 - hasDid = true 115 - } else if len(pvl) > 1 { 116 - return c.JSON(http.StatusBadRequest, XRPCError{Message: "only allow one did param"}) 117 - } 118 - case "since": 119 - // ok 120 - default: 121 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid param: %s", paramName)}) 122 - } 123 - } 124 - if !hasDid { 125 - return c.JSON(http.StatusBadRequest, XRPCError{Message: "need did param"}) 131 + 132 + didQuery := c.QueryParam("did") 133 + 134 + did, err := syntax.ParseDID(didQuery) 135 + if err != nil { 136 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("missing or invalid DID parameter: %s", err)}) 126 137 } 127 138 128 - acc, err := s.relay.GetAccount(ctx, did) 139 + ident, err := s.relay.Dir.LookupDID(ctx, did) 129 140 if err != nil { 130 - // TODO: better error 131 - return err 141 + // TODO: could handle lookup errors more granularly 142 + return c.JSON(http.StatusNotFound, xrpc.XRPCError{ErrStr: "RepoNotFound", Message: fmt.Sprintf("could not resolve DID: %s", err)}) 132 143 } 133 - 134 - host, err := s.relay.GetHost(ctx, acc.HostID) 144 + pdsHost, _, err := relay.ParseHostname(ident.PDSEndpoint()) 135 145 if err != nil { 136 - // TODO: better error 137 - return err 146 + return c.JSON(http.StatusNotFound, xrpc.XRPCError{ErrStr: "RepoNotFound", Message: "DID document has no valid atproto PDS endpoint"}) 138 147 } 139 148 140 - // TODO: proper error responses 149 + u := c.Request().URL 150 + if u != nil { 151 + return fmt.Errorf("unexpected nil URL on request") 152 + } 153 + u.Host = pdsHost 154 + // require SSL for redirect 155 + u.Scheme = "https" 156 + // StatusFound is HTTP 302, a temporary redirect 157 + return c.Redirect(http.StatusFound, u.String()) 158 + } 159 + 160 + func (s *Service) HandleComAtprotoSyncGetRepoStatus(c echo.Context) error { 161 + _, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetRepoStatus") 162 + defer span.End() 163 + 164 + didQuery := c.QueryParam("did") 165 + 166 + did, err := syntax.ParseDID(didQuery) 141 167 if err != nil { 142 - if errors.Is(err, gorm.ErrRecordNotFound) { 143 - return c.JSON(http.StatusNotFound, XRPCError{Message: "NULL"}) 144 - } 145 - s.logger.Error("user.pds.host lookup", "err", err) 146 - return c.JSON(http.StatusInternalServerError, XRPCError{Message: "sorry"}) 168 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("missing or invalid DID parameter: %s", err)}) 147 169 } 148 170 149 - nextUrl := *(c.Request().URL) 150 - nextUrl.Host = host.Hostname 151 - if nextUrl.Scheme == "" { 152 - nextUrl.Scheme = "https" 171 + out, handleErr := s.handleComAtprotoSyncGetRepoStatus(c, did) 172 + if handleErr != nil { 173 + return handleErr 153 174 } 154 - return c.Redirect(http.StatusFound, nextUrl.String()) 175 + return c.JSON(200, out) 155 176 } 156 177 157 - func (s *Service) HandleComAtprotoSyncRequestCrawl(c echo.Context) error { 158 - ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncRequestCrawl") 178 + func (s *Service) HandleComAtprotoSyncGetLatestCommit(c echo.Context) error { 179 + _, span := otel.Tracer("server").Start(c.Request().Context(), "HandleComAtprotoSyncGetLatestCommit") 159 180 defer span.End() 160 181 161 - var body comatprototypes.SyncRequestCrawl_Input 162 - if err := c.Bind(&body); err != nil { 163 - return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid body: %s", err)}) 182 + didQuery := c.QueryParam("did") 183 + 184 + did, err := syntax.ParseDID(didQuery) 185 + if err != nil { 186 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("missing or invalid DID parameter: %s", err)}) 164 187 } 188 + 189 + var out *comatproto.SyncGetLatestCommit_Output 165 190 var handleErr error 166 - // func (s *Service) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatprototypes.SyncRequestCrawl_Input) error 167 - handleErr = s.handleComAtprotoSyncRequestCrawl(ctx, &body) 191 + // func (s *Service) handleComAtprotoSyncGetLatestCommit(ctx context.Context,did string) (*comatproto.SyncGetLatestCommit_Output, error) 192 + out, handleErr = s.handleComAtprotoSyncGetLatestCommit(c, did) 168 193 if handleErr != nil { 169 194 return handleErr 170 195 } 171 - return nil 196 + return c.JSON(200, out) 172 197 }
+1 -1
cmd/rerelay/testing/runner.go
··· 129 129 130 130 sr := MustSimpleRelay(&dir, tmpd, s.Lenient) 131 131 132 - err = sr.Relay.SubscribeToHost(fmt.Sprintf("localhost:%d", hostPort), true, true, nil) 132 + err = sr.Relay.SubscribeToHost(fmt.Sprintf("localhost:%d", hostPort), true, true) 133 133 if err != nil { 134 134 return err 135 135 }