this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

first pass on admin endpoints

+516 -529
+1 -1
cmd/rerelay/handlers.go
··· 15 15 "github.com/labstack/echo/v4" 16 16 ) 17 17 18 - func (s *Service) handleComAtprotoSyncRequestCrawl(c echo.Context, body *comatproto.SyncRequestCrawl_Input) error { 18 + func (s *Service) handleComAtprotoSyncRequestCrawl(c echo.Context, body *comatproto.SyncRequestCrawl_Input, admin bool) error { 19 19 ctx := c.Request().Context() 20 20 21 21 hostname, noSSL, err := relay.ParseHostname(body.Hostname)
-487
cmd/rerelay/handlers_admin.WIP
··· 1 - package main 2 - 3 - import ( 4 - "errors" 5 - "fmt" 6 - "net/http" 7 - "net/url" 8 - "slices" 9 - "strconv" 10 - "strings" 11 - 12 - "github.com/bluesky-social/indigo/cmd/rerelay/relay" 13 - "github.com/bluesky-social/indigo/cmd/rerelay/relay/slurper" 14 - 15 - "github.com/labstack/echo/v4" 16 - dto "github.com/prometheus/client_model/go" 17 - "gorm.io/gorm" 18 - ) 19 - 20 - func (svc *Service) handleAdminSetSubsEnabled(e echo.Context) error { 21 - enabled, err := strconv.ParseBool(e.QueryParam("enabled")) 22 - if err != nil { 23 - return &echo.HTTPError{ 24 - Code: 400, 25 - Message: err.Error(), 26 - } 27 - } 28 - 29 - return svc.relay.Slurper.SetNewSubsDisabled(!enabled) 30 - } 31 - 32 - func (svc *Service) handleAdminGetSubsEnabled(e echo.Context) error { 33 - return e.JSON(200, map[string]bool{ 34 - "enabled": !svc.relay.Slurper.GetNewSubsDisabledState(), 35 - }) 36 - } 37 - 38 - func (svc *Service) handleAdminGetNewPDSPerDayRateLimit(e echo.Context) error { 39 - limit := svc.relay.Slurper.GetNewPDSPerDayLimit() 40 - return e.JSON(200, map[string]int64{ 41 - "limit": limit, 42 - }) 43 - } 44 - 45 - func (svc *Service) handleAdminSetNewPDSPerDayRateLimit(e echo.Context) error { 46 - limit, err := strconv.ParseInt(e.QueryParam("limit"), 10, 64) 47 - if err != nil { 48 - return &echo.HTTPError{ 49 - Code: 400, 50 - Message: fmt.Errorf("failed to parse limit: %w", err).Error(), 51 - } 52 - } 53 - 54 - err = svc.relay.Slurper.SetNewPDSPerDayLimit(limit) 55 - if err != nil { 56 - return &echo.HTTPError{ 57 - Code: 500, 58 - Message: fmt.Errorf("failed to set new PDS per day rate limit: %w", err).Error(), 59 - } 60 - } 61 - 62 - return nil 63 - } 64 - 65 - func (svc *Service) handleAdminTakeDownRepo(e echo.Context) error { 66 - ctx := e.Request().Context() 67 - 68 - var body map[string]string 69 - if err := e.Bind(&body); err != nil { 70 - return err 71 - } 72 - did, ok := body["did"] 73 - if !ok { 74 - return &echo.HTTPError{ 75 - Code: 400, 76 - Message: "must specify did parameter in body", 77 - } 78 - } 79 - 80 - err := svc.relay.TakeDownRepo(ctx, did) 81 - if err != nil { 82 - if errors.Is(err, gorm.ErrRecordNotFound) { 83 - return &echo.HTTPError{ 84 - Code: http.StatusNotFound, 85 - Message: "repo not found", 86 - } 87 - } 88 - return &echo.HTTPError{ 89 - Code: http.StatusInternalServerError, 90 - Message: err.Error(), 91 - } 92 - } 93 - return nil 94 - } 95 - 96 - func (svc *Service) handleAdminReverseTakedown(e echo.Context) error { 97 - did := e.QueryParam("did") 98 - ctx := e.Request().Context() 99 - err := svc.relay.ReverseTakedown(ctx, did) 100 - 101 - if err != nil { 102 - if errors.Is(err, gorm.ErrRecordNotFound) { 103 - return &echo.HTTPError{ 104 - Code: http.StatusNotFound, 105 - Message: "repo not found", 106 - } 107 - } 108 - return &echo.HTTPError{ 109 - Code: http.StatusInternalServerError, 110 - Message: err.Error(), 111 - } 112 - } 113 - 114 - return nil 115 - } 116 - 117 - type ListTakedownsResponse struct { 118 - Dids []string `json:"dids"` 119 - Cursor int64 `json:"cursor,omitempty"` 120 - } 121 - 122 - func (svc *Service) handleAdminListRepoTakeDowns(e echo.Context) error { 123 - ctx := e.Request().Context() 124 - haveMinId := false 125 - minId := int64(-1) 126 - qmin := e.QueryParam("cursor") 127 - if qmin != "" { 128 - tmin, err := strconv.ParseInt(qmin, 10, 64) 129 - if err != nil { 130 - return &echo.HTTPError{Code: 400, Message: "bad cursor"} 131 - } 132 - minId = tmin 133 - haveMinId = true 134 - } 135 - limit := 1000 136 - wat := svc.db.Model(slurper.Account{}).WithContext(ctx).Select("id", "did").Where("taken_down = TRUE") 137 - if haveMinId { 138 - wat = wat.Where("id > ?", minId) 139 - } 140 - //var users []slurper.Account 141 - rows, err := wat.Order("id").Limit(limit).Rows() 142 - if err != nil { 143 - return echo.NewHTTPError(http.StatusInternalServerError, "oops").WithInternal(err) 144 - } 145 - var out ListTakedownsResponse 146 - for rows.Next() { 147 - var id int64 148 - var did string 149 - err := rows.Scan(&id, &did) 150 - if err != nil { 151 - return echo.NewHTTPError(http.StatusInternalServerError, "oops").WithInternal(err) 152 - } 153 - out.Dids = append(out.Dids, did) 154 - out.Cursor = id 155 - } 156 - if len(out.Dids) < limit { 157 - out.Cursor = 0 158 - } 159 - return e.JSON(200, out) 160 - } 161 - 162 - func (svc *Service) handleAdminGetUpstreamConns(e echo.Context) error { 163 - return e.JSON(200, svc.relay.Slurper.GetActiveList()) 164 - } 165 - 166 - type rateLimit struct { 167 - Max float64 `json:"Max"` 168 - WindowSeconds float64 `json:"Window"` 169 - } 170 - 171 - type enrichedPDS struct { 172 - slurper.PDS 173 - HasActiveConnection bool `json:"HasActiveConnection"` 174 - EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"` 175 - PerSecondEventRate rateLimit `json:"PerSecondEventRate"` 176 - PerHourEventRate rateLimit `json:"PerHourEventRate"` 177 - PerDayEventRate rateLimit `json:"PerDayEventRate"` 178 - UserCount int64 `json:"UserCount"` 179 - } 180 - 181 - func (svc *Service) handleListPDSs(e echo.Context) error { 182 - var pds []slurper.PDS 183 - if err := svc.db.Find(&pds).Error; err != nil { 184 - return err 185 - } 186 - 187 - enrichedPDSs := make([]enrichedPDS, len(pds)) 188 - 189 - activePDSHosts := svc.relay.Slurper.GetActiveList() 190 - 191 - for i, p := range pds { 192 - enrichedPDSs[i].PDS = p 193 - enrichedPDSs[i].HasActiveConnection = false 194 - for _, host := range activePDSHosts { 195 - if strings.ToLower(host) == strings.ToLower(p.Host) { 196 - enrichedPDSs[i].HasActiveConnection = true 197 - break 198 - } 199 - } 200 - var m = &dto.Metric{} 201 - if err := relay.EventsReceivedCounter.WithLabelValues(p.Host).Write(m); err != nil { 202 - enrichedPDSs[i].EventsSeenSinceStartup = 0 203 - continue 204 - } 205 - enrichedPDSs[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) 206 - 207 - enrichedPDSs[i].PerSecondEventRate = rateLimit{ 208 - Max: p.RateLimit, 209 - WindowSeconds: 1, 210 - } 211 - 212 - enrichedPDSs[i].PerHourEventRate = rateLimit{ 213 - Max: float64(p.HourlyEventLimit), 214 - WindowSeconds: 3600, 215 - } 216 - 217 - enrichedPDSs[i].PerDayEventRate = rateLimit{ 218 - Max: float64(p.DailyEventLimit), 219 - WindowSeconds: 86400, 220 - } 221 - } 222 - 223 - return e.JSON(200, enrichedPDSs) 224 - } 225 - 226 - func (svc *Service) handleAdminListConsumers(e echo.Context) error { 227 - 228 - consumers := svc.relay.ListConsumers() 229 - return e.JSON(200, consumers) 230 - } 231 - 232 - func (svc *Service) handleAdminKillUpstreamConn(e echo.Context) error { 233 - host := strings.TrimSpace(e.QueryParam("host")) 234 - if host == "" { 235 - return &echo.HTTPError{ 236 - Code: 400, 237 - Message: "must pass a valid host", 238 - } 239 - } 240 - 241 - block := strings.ToLower(e.QueryParam("block")) == "true" 242 - 243 - if err := svc.relay.Slurper.KillUpstreamConnection(host, block); err != nil { 244 - if errors.Is(err, slurper.ErrNoActiveConnection) { 245 - return &echo.HTTPError{ 246 - Code: 400, 247 - Message: "no active connection to given host", 248 - } 249 - } 250 - return err 251 - } 252 - 253 - return e.JSON(200, map[string]any{ 254 - "success": "true", 255 - }) 256 - } 257 - 258 - func (svc *Service) handleBlockPDS(e echo.Context) error { 259 - host := strings.TrimSpace(e.QueryParam("host")) 260 - if host == "" { 261 - return &echo.HTTPError{ 262 - Code: 400, 263 - Message: "must pass a valid host", 264 - } 265 - } 266 - 267 - // Set the block flag to true in the DB 268 - if err := svc.db.Model(&slurper.PDS{}).Where("host = ?", host).Update("blocked", true).Error; err != nil { 269 - return err 270 - } 271 - 272 - // don't care if this errors, but we should try to disconnect something we just blocked 273 - _ = svc.relay.Slurper.KillUpstreamConnection(host, false) 274 - 275 - return e.JSON(200, map[string]any{ 276 - "success": "true", 277 - }) 278 - } 279 - 280 - func (svc *Service) handleUnblockPDS(e echo.Context) error { 281 - host := strings.TrimSpace(e.QueryParam("host")) 282 - if host == "" { 283 - return &echo.HTTPError{ 284 - Code: 400, 285 - Message: "must pass a valid host", 286 - } 287 - } 288 - 289 - // Set the block flag to false in the DB 290 - if err := svc.db.Model(&slurper.PDS{}).Where("host = ?", host).Update("blocked", false).Error; err != nil { 291 - return err 292 - } 293 - 294 - return e.JSON(200, map[string]any{ 295 - "success": "true", 296 - }) 297 - } 298 - 299 - type bannedDomains struct { 300 - BannedDomains []string `json:"banned_domains"` 301 - } 302 - 303 - func (svc *Service) handleAdminListDomainBans(c echo.Context) error { 304 - var all []slurper.DomainBan 305 - if err := svc.db.Find(&all).Error; err != nil { 306 - return err 307 - } 308 - 309 - resp := bannedDomains{ 310 - BannedDomains: []string{}, 311 - } 312 - for _, b := range all { 313 - resp.BannedDomains = append(resp.BannedDomains, b.Domain) 314 - } 315 - 316 - return c.JSON(200, resp) 317 - } 318 - 319 - type banDomainBody struct { 320 - Domain string 321 - } 322 - 323 - func (svc *Service) handleAdminBanDomain(c echo.Context) error { 324 - var body banDomainBody 325 - if err := c.Bind(&body); err != nil { 326 - return err 327 - } 328 - 329 - // Check if the domain is already banned 330 - var existing slurper.DomainBan 331 - if err := svc.db.Where("domain = ?", body.Domain).First(&existing).Error; err == nil { 332 - return &echo.HTTPError{ 333 - Code: 400, 334 - Message: "domain is already banned", 335 - } 336 - } 337 - 338 - if err := svc.db.Create(&slurper.DomainBan{ 339 - Domain: body.Domain, 340 - }).Error; err != nil { 341 - return err 342 - } 343 - 344 - return c.JSON(200, map[string]any{ 345 - "success": "true", 346 - }) 347 - } 348 - 349 - func (svc *Service) handleAdminUnbanDomain(c echo.Context) error { 350 - var body banDomainBody 351 - if err := c.Bind(&body); err != nil { 352 - return err 353 - } 354 - 355 - if err := svc.db.Where("domain = ?", body.Domain).Delete(&slurper.DomainBan{}).Error; err != nil { 356 - return err 357 - } 358 - 359 - return c.JSON(200, map[string]any{ 360 - "success": "true", 361 - }) 362 - } 363 - 364 - type RateLimitChangeRequest struct { 365 - Host string `json:"host"` 366 - slurper.PDSRates 367 - } 368 - 369 - func (svc *Service) handleAdminChangePDSRateLimits(e echo.Context) error { 370 - var body RateLimitChangeRequest 371 - if err := e.Bind(&body); err != nil { 372 - return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err)) 373 - } 374 - 375 - // Get the PDS from the DB 376 - var pds slurper.PDS 377 - if err := svc.db.Where("host = ?", body.Host).First(&pds).Error; err != nil { 378 - return err 379 - } 380 - 381 - // Update the rate limits in the DB 382 - pds.RateLimit = float64(body.PerSecond) 383 - pds.HourlyEventLimit = body.PerHour 384 - pds.DailyEventLimit = body.PerDay 385 - pds.RepoLimit = body.RepoLimit 386 - 387 - if err := svc.db.Save(&pds).Error; err != nil { 388 - return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err)) 389 - } 390 - 391 - // Update the rate limit in the limiter 392 - limits := svc.relay.Slurper.GetOrCreateLimiters(pds.ID, body.PerSecond, body.PerHour, body.PerDay) 393 - limits.PerSecond.SetLimit(body.PerSecond) 394 - limits.PerHour.SetLimit(body.PerHour) 395 - limits.PerDay.SetLimit(body.PerDay) 396 - 397 - return e.JSON(200, map[string]any{ 398 - "success": "true", 399 - }) 400 - } 401 - 402 - func (svc *Service) handleAdminAddTrustedDomain(e echo.Context) error { 403 - domain := e.QueryParam("domain") 404 - if domain == "" { 405 - return fmt.Errorf("must specify domain in query parameter") 406 - } 407 - 408 - // Check if the domain is already trusted 409 - trustedDomains := svc.relay.Slurper.GetTrustedDomains() 410 - if slices.Contains(trustedDomains, domain) { 411 - return &echo.HTTPError{ 412 - Code: 400, 413 - Message: "domain is already trusted", 414 - } 415 - } 416 - 417 - if err := svc.relay.Slurper.AddTrustedDomain(domain); err != nil { 418 - return err 419 - } 420 - 421 - return e.JSON(200, map[string]any{ 422 - "success": true, 423 - }) 424 - } 425 - 426 - type AdminRequestCrawlRequest struct { 427 - Hostname string `json:"hostname"` 428 - 429 - // optional: 430 - slurper.PDSRates 431 - } 432 - 433 - func (svc *Service) handleAdminRequestCrawl(e echo.Context) error { 434 - ctx := e.Request().Context() 435 - 436 - var body AdminRequestCrawlRequest 437 - if err := e.Bind(&body); err != nil { 438 - return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err)) 439 - } 440 - 441 - host := body.Hostname 442 - if host == "" { 443 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname") 444 - } 445 - 446 - if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") { 447 - if svc.relay.Config.SSL { 448 - host = "https://" + host 449 - } else { 450 - host = "http://" + host 451 - } 452 - } 453 - 454 - u, err := url.Parse(host) 455 - if err != nil { 456 - return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname") 457 - } 458 - 459 - if u.Scheme == "http" && svc.relay.Config.SSL { 460 - return echo.NewHTTPError(http.StatusBadRequest, "this server requires https") 461 - } 462 - 463 - if u.Scheme == "https" && !svc.relay.Config.SSL { 464 - return echo.NewHTTPError(http.StatusBadRequest, "this server does not support https") 465 - } 466 - 467 - if u.Path != "" { 468 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path") 469 - } 470 - 471 - if u.Query().Encode() != "" { 472 - return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query") 473 - } 474 - 475 - host = u.Host // potentially hostname:port 476 - 477 - banned, err := svc.relay.DomainIsBanned(ctx, host) 478 - if banned { 479 - return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned") 480 - } 481 - 482 - // Skip checking if the server is online for now 483 - rateOverrides := body.PDSRates 484 - rateOverrides.FromSlurper(svc.relay.Slurper) 485 - 486 - return svc.relay.Slurper.SubscribeToPds(ctx, host, true, true, &rateOverrides) // Override Trusted Domain Check 487 - }
+451
cmd/rerelay/handlers_admin.go
··· 1 + package main 2 + 3 + import ( 4 + "errors" 5 + "fmt" 6 + "net/http" 7 + "strconv" 8 + "strings" 9 + 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + "github.com/bluesky-social/indigo/cmd/rerelay/relay" 13 + "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 14 + 15 + "github.com/labstack/echo/v4" 16 + dto "github.com/prometheus/client_model/go" 17 + "gorm.io/gorm" 18 + ) 19 + 20 + // this is the same as the regular com.atproto.sync.requestCrawl endpoint, except it sets a flag to bypass configuration checks 21 + func (s *Service) handleAdminRequestCrawl(c echo.Context) error { 22 + var body comatproto.SyncRequestCrawl_Input 23 + if err := c.Bind(&body); err != nil { 24 + return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Sprintf("invalid body: %s", err)} 25 + } 26 + 27 + // func (s *Service) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatproto.SyncRequestCrawl_Input) error 28 + return s.handleComAtprotoSyncRequestCrawl(c, &body, true) 29 + } 30 + 31 + func (s *Service) handleAdminSetSubsEnabled(c echo.Context) error { 32 + enabled, err := strconv.ParseBool(c.QueryParam("enabled")) 33 + if err != nil { 34 + return &echo.HTTPError{Code: http.StatusBadRequest, Message: err.Error()} 35 + } 36 + s.relay.Config.DisableNewHosts = !enabled 37 + return c.JSON(http.StatusOK, map[string]any{ 38 + "success": "true", 39 + }) 40 + } 41 + 42 + func (s *Service) handleAdminGetSubsEnabled(c echo.Context) error { 43 + return c.JSON(http.StatusOK, map[string]bool{ 44 + "enabled": s.relay.Config.DisableNewHosts, 45 + }) 46 + } 47 + 48 + func (s *Service) handleAdminGetNewHostPerDayRateLimit(c echo.Context) error { 49 + return c.JSON(http.StatusOK, map[string]int64{ 50 + "limit": s.relay.Slurper.NewHostPerDayLimiter.Limit(), 51 + }) 52 + } 53 + 54 + func (s *Service) handleAdminSetNewHostPerDayRateLimit(c echo.Context) error { 55 + limit, err := strconv.ParseInt(c.QueryParam("limit"), 10, 64) 56 + if err != nil { 57 + return &echo.HTTPError{Code: http.StatusBadRequest, Message: fmt.Errorf("failed to parse limit: %w", err).Error()} 58 + } 59 + 60 + s.relay.Slurper.NewHostPerDayLimiter.SetLimit(limit) 61 + return c.JSON(http.StatusOK, map[string]any{ 62 + "success": "true", 63 + }) 64 + } 65 + 66 + func (s *Service) handleAdminTakeDownRepo(c echo.Context) error { 67 + ctx := c.Request().Context() 68 + 69 + var body map[string]string 70 + if err := c.Bind(&body); err != nil { 71 + return err 72 + } 73 + didField, ok := body["did"] 74 + if !ok { 75 + return &echo.HTTPError{ 76 + Code: http.StatusBadRequest, 77 + Message: "must specify DID parameter in body", 78 + } 79 + } 80 + did, err := syntax.ParseDID(didField) 81 + if err != nil { 82 + return err 83 + } 84 + 85 + if err := s.relay.UpdateAccountStatus(ctx, did, models.AccountStatusTakendown); err != nil { 86 + if errors.Is(err, gorm.ErrRecordNotFound) { 87 + return &echo.HTTPError{ 88 + Code: http.StatusNotFound, 89 + Message: "account not found", 90 + } 91 + } 92 + return &echo.HTTPError{ 93 + Code: http.StatusInternalServerError, 94 + Message: err.Error(), 95 + } 96 + } 97 + return c.JSON(http.StatusOK, map[string]any{ 98 + "success": "true", 99 + }) 100 + } 101 + 102 + func (s *Service) handleAdminReverseTakedown(c echo.Context) error { 103 + ctx := c.Request().Context() 104 + 105 + did, err := syntax.ParseDID(c.QueryParam("did")) 106 + if err != nil { 107 + return err 108 + } 109 + 110 + if err := s.relay.UpdateAccountStatus(ctx, did, models.AccountStatusActive); err != nil { 111 + if errors.Is(err, gorm.ErrRecordNotFound) { 112 + return &echo.HTTPError{ 113 + Code: http.StatusNotFound, 114 + Message: "repo not found", 115 + } 116 + } 117 + return &echo.HTTPError{ 118 + Code: http.StatusInternalServerError, 119 + Message: err.Error(), 120 + } 121 + } 122 + return c.JSON(http.StatusOK, map[string]any{ 123 + "success": "true", 124 + }) 125 + } 126 + 127 + type ListTakedownsResponse struct { 128 + DIDs []string `json:"dids"` 129 + Cursor int64 `json:"cursor,omitempty"` 130 + } 131 + 132 + func (s *Service) handleAdminListRepoTakeDowns(c echo.Context) error { 133 + ctx := c.Request().Context() 134 + var err error 135 + 136 + limit := 500 137 + cursor := int64(0) 138 + cursorQuery := c.QueryParam("cursor") 139 + if cursorQuery != "" { 140 + cursor, err = strconv.ParseInt(cursorQuery, 10, 64) 141 + if err != nil { 142 + return &echo.HTTPError{Code: http.StatusBadRequest, Message: "invalid cursor param"} 143 + } 144 + } 145 + 146 + accounts, err := s.relay.ListAccountTakedowns(ctx, cursor, limit) 147 + if err != nil { 148 + return &echo.HTTPError{Code: http.StatusInternalServerError, Message: "failed to list takedowns"} 149 + } 150 + 151 + out := ListTakedownsResponse{ 152 + DIDs: make([]string, len(accounts)), 153 + } 154 + for i, acc := range accounts { 155 + out.DIDs[i] = acc.DID 156 + out.Cursor = int64(acc.UID) 157 + } 158 + if len(out.DIDs) < limit { 159 + out.Cursor = 0 160 + } 161 + return c.JSON(http.StatusOK, out) 162 + } 163 + 164 + func (s *Service) handleAdminGetUpstreamConns(c echo.Context) error { 165 + return c.JSON(http.StatusOK, s.relay.Slurper.GetActiveSubHostnames()) 166 + } 167 + 168 + type rateLimit struct { 169 + Max float64 `json:"Max"` 170 + WindowSeconds float64 `json:"Window"` 171 + } 172 + 173 + type hostInfo struct { 174 + models.Host 175 + HasActiveConnection bool `json:"HasActiveConnection"` 176 + EventsSeenSinceStartup uint64 `json:"EventsSeenSinceStartup"` 177 + PerSecondEventRate rateLimit `json:"PerSecondEventRate"` 178 + PerHourEventRate rateLimit `json:"PerHourEventRate"` 179 + PerDayEventRate rateLimit `json:"PerDayEventRate"` 180 + UserCount int64 `json:"UserCount"` 181 + } 182 + 183 + func (s *Service) handleListHosts(c echo.Context) error { 184 + ctx := c.Request().Context() 185 + 186 + limit := 10_000 187 + hosts, err := s.relay.ListHosts(ctx, 0, limit) 188 + if err != nil { 189 + return err 190 + } 191 + 192 + hostInfos := make([]hostInfo, len(hosts)) 193 + 194 + var activeHosts map[string]bool 195 + activeHostnames := s.relay.Slurper.GetActiveSubHostnames() 196 + for _, hostname := range activeHostnames { 197 + activeHosts[hostname] = true 198 + } 199 + 200 + for i, host := range hosts { 201 + hostInfos[i].Host = *host 202 + _, isActive := activeHosts[host.Hostname] 203 + hostInfos[i].HasActiveConnection = isActive 204 + 205 + // pull event counter metrics from prometheus 206 + var m = &dto.Metric{} 207 + if err := relay.EventsReceivedCounter.WithLabelValues(host.Hostname).Write(m); err != nil { 208 + hostInfos[i].EventsSeenSinceStartup = 0 209 + continue 210 + } 211 + hostInfos[i].EventsSeenSinceStartup = uint64(m.Counter.GetValue()) 212 + 213 + hostInfos[i].UserCount = host.AccountCount 214 + 215 + /* XXX: compute these from account limit 216 + hostInfos[i].PerSecondEventRate = rateLimit{ 217 + Max: p.RateLimit, 218 + WindowSeconds: 1, 219 + } 220 + hostInfos[i].PerHourEventRate = rateLimit{ 221 + Max: float64(p.HourlyEventLimit), 222 + WindowSeconds: 3600, 223 + } 224 + hostInfos[i].PerDayEventRate = rateLimit{ 225 + Max: float64(p.DailyEventLimit), 226 + WindowSeconds: 86400, 227 + } 228 + */ 229 + } 230 + 231 + return c.JSON(http.StatusOK, hostInfos) 232 + } 233 + 234 + func (s *Service) handleAdminListConsumers(c echo.Context) error { 235 + return c.JSON(http.StatusOK, s.relay.ListConsumers()) 236 + } 237 + 238 + func (s *Service) handleAdminKillUpstreamConn(c echo.Context) error { 239 + queryHost := strings.TrimSpace(c.QueryParam("host")) 240 + hostname, _, err := relay.ParseHostname(queryHost) 241 + if err != nil { 242 + return &echo.HTTPError{ 243 + Code: http.StatusBadRequest, 244 + Message: "must pass a valid host", 245 + } 246 + } 247 + 248 + banHost := strings.ToLower(c.QueryParam("block")) == "true" 249 + 250 + // TODO: move this method to relay (for updating the database) 251 + if err := s.relay.Slurper.KillUpstreamConnection(hostname, banHost); err != nil { 252 + if errors.Is(err, relay.ErrNoActiveConnection) { 253 + return &echo.HTTPError{ 254 + Code: http.StatusBadRequest, 255 + Message: "no active connection to given host", 256 + } 257 + } 258 + return err 259 + } 260 + 261 + return c.JSON(http.StatusOK, map[string]any{ 262 + "success": "true", 263 + }) 264 + } 265 + 266 + func (s *Service) handleBlockHost(c echo.Context) error { 267 + ctx := c.Request().Context() 268 + 269 + queryHost := strings.TrimSpace(c.QueryParam("host")) 270 + hostname, _, err := relay.ParseHostname(queryHost) 271 + if err != nil { 272 + return &echo.HTTPError{ 273 + Code: http.StatusBadRequest, 274 + Message: "must pass a valid hostname", 275 + } 276 + } 277 + 278 + host, err := s.relay.GetHost(ctx, hostname) 279 + if err != nil { 280 + return err 281 + } 282 + 283 + if host.Status != models.HostStatusBanned { 284 + if err := s.relay.UpdateHostStatus(ctx, host.ID, models.HostStatusBanned); err != nil { 285 + return err 286 + } 287 + } 288 + 289 + // kill any active connection (there may not be one, so ignore error) 290 + _ = s.relay.Slurper.KillUpstreamConnection(host.Hostname, false) 291 + 292 + return c.JSON(http.StatusOK, map[string]any{ 293 + "success": "true", 294 + }) 295 + } 296 + 297 + func (s *Service) handleUnblockHost(c echo.Context) error { 298 + ctx := c.Request().Context() 299 + 300 + queryHost := strings.TrimSpace(c.QueryParam("host")) 301 + hostname, _, err := relay.ParseHostname(queryHost) 302 + if err != nil { 303 + return &echo.HTTPError{ 304 + Code: http.StatusBadRequest, 305 + Message: "must pass a valid hostname", 306 + } 307 + } 308 + 309 + host, err := s.relay.GetHost(ctx, hostname) 310 + if err != nil { 311 + return err 312 + } 313 + 314 + if host.Status != models.HostStatusActive { 315 + if err := s.relay.UpdateHostStatus(ctx, host.ID, models.HostStatusActive); err != nil { 316 + return err 317 + } 318 + } 319 + 320 + return c.JSON(http.StatusOK, map[string]any{ 321 + "success": "true", 322 + }) 323 + } 324 + 325 + type bannedDomains struct { 326 + BannedDomains []string `json:"banned_domains"` 327 + } 328 + 329 + func (s *Service) handleAdminListDomainBans(c echo.Context) error { 330 + ctx := c.Request().Context() 331 + 332 + bans, err := s.relay.ListDomainBans(ctx) 333 + if err != nil { 334 + return err 335 + } 336 + 337 + resp := bannedDomains{ 338 + BannedDomains: make([]string, len(bans)), 339 + } 340 + 341 + for i, ban := range bans { 342 + resp.BannedDomains[i] = ban.Domain 343 + } 344 + 345 + return c.JSON(http.StatusOK, resp) 346 + } 347 + 348 + type banDomainBody struct { 349 + Domain string 350 + } 351 + 352 + func (s *Service) handleAdminBanDomain(c echo.Context) error { 353 + ctx := c.Request().Context() 354 + 355 + var body banDomainBody 356 + if err := c.Bind(&body); err != nil { 357 + return err 358 + } 359 + 360 + err := s.relay.CreateDomainBan(ctx, body.Domain) 361 + if err != nil { 362 + return err 363 + } 364 + 365 + return c.JSON(http.StatusOK, map[string]any{ 366 + "success": "true", 367 + }) 368 + } 369 + 370 + func (s *Service) handleAdminUnbanDomain(c echo.Context) error { 371 + ctx := c.Request().Context() 372 + 373 + var body banDomainBody 374 + if err := c.Bind(&body); err != nil { 375 + return err 376 + } 377 + 378 + err := s.relay.RemoveDomainBan(ctx, body.Domain) 379 + if err != nil { 380 + return err 381 + } 382 + 383 + return c.JSON(http.StatusOK, map[string]any{ 384 + "success": "true", 385 + }) 386 + } 387 + 388 + type RateLimitChangeRequest struct { 389 + Host string `json:"host"` 390 + relay.HostRates 391 + } 392 + 393 + /* XXX: finish rate limit stuff 394 + func (s *Service) handleAdminChangeHostRateLimits(c echo.Context) error { 395 + var body RateLimitChangeRequest 396 + if err := c.Bind(&body); err != nil { 397 + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid body: %s", err)) 398 + } 399 + 400 + var pds models.Host 401 + if err := s.db.Where("host = ?", body.Host).First(&pds).Error; err != nil { 402 + return err 403 + } 404 + 405 + // Update the rate limits in the DB 406 + pds.RateLimit = float64(body.PerSecond) 407 + pds.HourlyEventLimit = body.PerHour 408 + pds.DailyEventLimit = body.PerDay 409 + pds.RepoLimit = body.RepoLimit 410 + 411 + if err := s.db.Save(&pds).Error; err != nil { 412 + return echo.NewHTTPError(http.StatusInternalServerError, fmt.Errorf("failed to save rate limit changes: %w", err)) 413 + } 414 + 415 + // Update the rate limit in the limiter 416 + limits := s.relay.Slurper.GetOrCreateLimiters(pds.ID, body.PerSecond, body.PerHour, body.PerDay) 417 + limits.PerSecond.SetLimit(body.PerSecond) 418 + limits.PerHour.SetLimit(body.PerHour) 419 + limits.PerDay.SetLimit(body.PerDay) 420 + 421 + return c.JSON(http.StatusOK, map[string]any{ 422 + "success": "true", 423 + }) 424 + } 425 + */ 426 + 427 + /* XXX: DREPRECATED 428 + func (s *Service) handleAdminAddTrustedDomain(c echo.Context) error { 429 + domain := c.QueryParam("domain") 430 + if domain == "" { 431 + return fmt.Errorf("must specify domain in query parameter") 432 + } 433 + 434 + // Check if the domain is already trusted 435 + trustedDomains := s.relay.Slurper.GetTrustedDomains() 436 + if slices.Contains(trustedDomains, domain) { 437 + return &echo.HTTPError{ 438 + Code: http.StatusBadRequest, 439 + Message: "domain is already trusted", 440 + } 441 + } 442 + 443 + if err := s.relay.Slurper.AddTrustedDomain(domain); err != nil { 444 + return err 445 + } 446 + 447 + return c.JSON(http.StatusOK, map[string]any{ 448 + "success": true, 449 + }) 450 + } 451 + */
+9
cmd/rerelay/relay/account.go
··· 203 203 return accounts, nil 204 204 } 205 205 206 + func (r *Relay) ListAccountTakedowns(ctx context.Context, cursor int64, limit int) ([]*models.Account, error) { 207 + 208 + accounts := []*models.Account{} 209 + if err := r.db.Model(&models.Account{}).Where("uid > ? AND status = ?", cursor, models.AccountStatusTakendown).Order("uid").Limit(limit).Find(&accounts).Error; err != nil { 210 + return nil, err 211 + } 212 + return accounts, nil 213 + } 214 + 206 215 func (r *Relay) UpsertAccountRepo(uid uint64, rev syntax.TID, commitCID, commitDataCID string) error { 207 216 return r.db.Exec("INSERT INTO account_repo (uid, rev, commit_cid, commit_data) VALUES (?, ?, ?, ?) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_cid = EXCLUDED.commit_cid, commit_data = EXCLUDED.commit_data", uid, rev, commitCID, commitDataCID).Error 208 217 }
+39 -22
cmd/rerelay/relay/domain_ban.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "errors" 6 + "fmt" 5 7 "strings" 6 8 7 9 "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 10 + 11 + "gorm.io/gorm" 8 12 ) 9 13 10 - // DomainIsBanned checks if the given host is banned, starting with the host 11 - // itself, then checking every parent domain up to the tld 12 - func (r *Relay) DomainIsBanned(ctx context.Context, host string) (bool, error) { 13 - // ignore ports when checking for ban status 14 - hostport := strings.Split(host, ":") 14 + // XXX: tests for domain ban logic (which hit an actual database) 15 15 16 - segments := strings.Split(hostport[0], ".") 16 + // DomainIsBanned checks if the given hostname is banned. It checks all domain suffixs. 17 + // 18 + // Hostname is assumed to have been parsed/normalized (eg, lower-case). 19 + func (r *Relay) DomainIsBanned(ctx context.Context, hostname string) (bool, error) { 17 20 18 - // TODO: use normalize method once that merges 19 - var cleaned []string 20 - for _, s := range segments { 21 - if s == "" { 22 - continue 23 - } 24 - s = strings.ToLower(s) 21 + if strings.HasPrefix(hostname, "localhost:") { 22 + // XXX: check localhost config separately 23 + } 25 24 26 - cleaned = append(cleaned, s) 25 + // otherwise we shouldn't have a port/colon 26 + if strings.Contains(hostname, ":") { 27 + return false, fmt.Errorf("unexpected colon in hostname: %s", hostname) 27 28 } 28 - segments = cleaned 29 29 30 + // try entire host, and then all domain suffixes 31 + segments := strings.Split(hostname, ".") 30 32 for i := 0; i < len(segments)-1; i++ { 31 33 dchk := strings.Join(segments[i:], ".") 32 34 found, err := r.findDomainBan(ctx, dchk) 33 35 if err != nil { 34 36 return false, err 35 37 } 36 - 37 38 if found { 38 39 return true, nil 39 40 } ··· 41 42 return false, nil 42 43 } 43 44 44 - func (r *Relay) findDomainBan(ctx context.Context, host string) (bool, error) { 45 + func (r *Relay) findDomainBan(ctx context.Context, domain string) (bool, error) { 45 46 var ban models.DomainBan 46 - if err := r.db.Find(&ban, "domain = ?", host).Error; err != nil { 47 + if err := r.db.Model(&models.DomainBan{}).Where("domain = ?", domain).First(&ban).Error; err != nil { 48 + if errors.Is(err, gorm.ErrRecordNotFound) { 49 + return false, nil 50 + } 47 51 return false, err 48 52 } 53 + return true, nil 54 + } 49 55 50 - if ban.ID == 0 { 51 - return false, nil 52 - } 56 + func (r *Relay) CreateDomainBan(ctx context.Context, domain string) error { 57 + domainBan := models.DomainBan{Domain: domain} 58 + return r.db.Create(&domainBan).Error 59 + } 53 60 54 - return true, nil 61 + func (r *Relay) RemoveDomainBan(ctx context.Context, domain string) error { 62 + return r.db.Delete(&models.DomainBan{}, "domain = ?", domain).Error 63 + } 64 + 65 + // returns all domain bans 66 + func (r *Relay) ListDomainBans(ctx context.Context) ([]models.DomainBan, error) { 67 + bans := []models.DomainBan{} 68 + if err := r.db.Model(&models.DomainBan{}).Find(&bans).Error; err != nil { 69 + return nil, err 70 + } 71 + return bans, nil 55 72 }
+5 -6
cmd/rerelay/relay/slurper.go
··· 489 489 return err 490 490 } 491 491 492 - // TODO: called from admin endpoint 493 - func (s *Slurper) GetActiveList() []string { 492 + func (s *Slurper) GetActiveSubHostnames() []string { 494 493 s.subsLk.Lock() 495 494 defer s.subsLk.Unlock() 496 - var out []string 495 + 496 + var keys []string 497 497 for k := range s.subs { 498 - out = append(out, k) 498 + keys = append(keys, k) 499 499 } 500 - 501 - return out 500 + return keys 502 501 } 503 502 504 503 func (s *Slurper) KillUpstreamConnection(hostname string, ban bool) error {
+10 -12
cmd/rerelay/service.go
··· 148 148 e.GET("/xrpc/com.atproto.sync.getRepoStatus", svc.HandleComAtprotoSyncGetRepoStatus) 149 149 e.GET("/xrpc/com.atproto.sync.getLatestCommit", svc.HandleComAtprotoSyncGetLatestCommit) 150 150 151 - /* XXX: disabled while refactoring 152 151 admin := e.Group("/admin", svc.checkAdminAuth) 153 152 154 153 // Slurper-related Admin API 155 154 admin.GET("/subs/getUpstreamConns", svc.handleAdminGetUpstreamConns) 155 + admin.POST("/subs/killUpstream", svc.handleAdminKillUpstreamConn) 156 156 admin.GET("/subs/getEnabled", svc.handleAdminGetSubsEnabled) 157 - admin.GET("/subs/perDayLimit", svc.handleAdminGetNewPDSPerDayRateLimit) 158 157 admin.POST("/subs/setEnabled", svc.handleAdminSetSubsEnabled) 159 - admin.POST("/subs/killUpstream", svc.handleAdminKillUpstreamConn) 160 - admin.POST("/subs/setPerDayLimit", svc.handleAdminSetNewPDSPerDayRateLimit) 158 + admin.GET("/subs/perDayLimit", svc.handleAdminGetNewHostPerDayRateLimit) 159 + admin.POST("/subs/setPerDayLimit", svc.handleAdminSetNewHostPerDayRateLimit) 161 160 162 161 // Domain-related Admin API 163 162 admin.GET("/subs/listDomainBans", svc.handleAdminListDomainBans) ··· 165 164 admin.POST("/subs/unbanDomain", svc.handleAdminUnbanDomain) 166 165 167 166 // Repo-related Admin API 167 + admin.GET("/repo/takedowns", svc.handleAdminListRepoTakeDowns) 168 168 admin.POST("/repo/takeDown", svc.handleAdminTakeDownRepo) 169 169 admin.POST("/repo/reverseTakedown", svc.handleAdminReverseTakedown) 170 - admin.GET("/repo/takedowns", svc.handleAdminListRepoTakeDowns) 171 170 172 - // PDS-related Admin API 171 + // Host-related Admin API 172 + admin.GET("/pds/list", svc.handleListHosts) 173 173 admin.POST("/pds/requestCrawl", svc.handleAdminRequestCrawl) 174 - admin.GET("/pds/list", svc.handleListPDSs) 175 - admin.POST("/pds/changeLimits", svc.handleAdminChangePDSRateLimits) 176 - admin.POST("/pds/block", svc.handleBlockPDS) 177 - admin.POST("/pds/unblock", svc.handleUnblockPDS) 178 - admin.POST("/pds/addTrustedDomain", svc.handleAdminAddTrustedDomain) 174 + // TODO: admin.POST("/pds/changeLimits", svc.handleAdminChangeHostRateLimits) 175 + admin.POST("/pds/block", svc.handleBlockHost) 176 + admin.POST("/pds/unblock", svc.handleUnblockHost) 177 + // removed: admin.POST("/pds/addTrustedDomain", svc.handleAdminAddTrustedDomain) 179 178 180 179 // Consumer-related Admin API 181 180 admin.GET("/consumers/list", svc.handleAdminListConsumers) 182 - */ 183 181 184 182 // In order to support booting on random ports in tests, we need to tell the 185 183 // Echo instance it's already got a port, and then use its StartServer
+1 -1
cmd/rerelay/stubs.go
··· 41 41 } 42 42 43 43 // func (s *Service) handleComAtprotoSyncRequestCrawl(ctx context.Context,body *comatproto.SyncRequestCrawl_Input) error 44 - return s.handleComAtprotoSyncRequestCrawl(c, &body) 44 + return s.handleComAtprotoSyncRequestCrawl(c, &body, false) 45 45 } 46 46 47 47 func (s *Service) HandleComAtprotoSyncListHosts(c echo.Context) error {