this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

yet more relay patches and tweaks (#1051)

This is a working branch.

authored by

bnewbold and committed by
GitHub
c6066a88 786a781b

+46 -22
+16 -1
cmd/relay/README.md
··· 110 110 111 111 The relay admin interface has flexibility for many situations, but in some operational incidents it may be necessary to run SQL commands to do cleanups. This should be done when the relay is not actively operating. It is also recommended to run SQL commands in a transaction that can be rolled back in case of a typo or mistake. 112 112 113 + On the public web, you should probably run the relay behind a load-balancer or reverse proxy like `haproxy` or `caddy`, which manages TLS and can have various HTTP limits and behaviors configured. Remember that WebSocket support is required. 114 + 115 + The relay does not resolve atproto handles, but it does do DNS resolutions for hostnames, and may do a burst of resolutions at startup. Note that the go runtime may have an internal DNS implementation enabled (this is the default for the Dockerfile). The relay *will* do a large number of DID resolutions, particularly calls to the PLC directory, and particularly after a process restart when the in-process identity cache is warming up. 116 + 113 117 ### PostgreSQL 114 118 115 119 PostgreSQL is recommended for any non-trival relay deployments. Database configuration is passed via the `DATABASE_URL` environment variable, or the corresponding CLI arg. ··· 127 131 128 132 The relay is relatively easy to build and operate as as simple executable, but there is also Dockerfile in this directory. It can be used to build customized/patched versions of the relay as a container, republish them, run locally, deploy to servers, deploy to an orchestrated cluster, etc. 129 133 130 - We strongly recommend running docker in "host networking" mode when operating a full-network relay. 134 + Relays process a lot of packets, so we strongly recommend running docker in "host networking" mode when operating a full-network relay. You may also want to use something other than default docker log management (eg, `svlogd`), to handle large log volumes. 131 135 132 136 ### Bootstrapping Host List 133 137 138 + Before bulk-adding hosts, you should probably increase the "new-hosts-per-day" limit, at least temporarily. 139 + 134 140 The relay comes with a helper command to pull a list of hosts from an existing relay. You should shut the relay down first and run this as a separate command: 135 141 136 142 ./relay pull-hosts 143 + 144 + An alternative method, using `goat` and `parallel`, which is more gentle and may be better for small servers: 145 + 146 + # dump a host list using goat 147 + # 'rg' is ripgrep 148 + RELAY_HOST=https://relay1.us-west.bsky.network goat relay host list | rg '\tactive' | cut -f1 > hosts.txt 149 + 150 + # assuming that .env contains local relay configuration and admin credential 151 + shuf hosts.txt | parallel goat relay admin host add {}
+1 -1
cmd/relay/handlers.go
··· 63 63 func (s *Service) handleComAtprotoSyncListHosts(c echo.Context, cursor int64, limit int) (*comatproto.SyncListHosts_Output, error) { 64 64 ctx := c.Request().Context() 65 65 66 - hosts, err := s.relay.ListHosts(ctx, cursor, limit) 66 + hosts, err := s.relay.ListHosts(ctx, cursor, limit, true) 67 67 if err != nil { 68 68 return nil, c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to list hosts"}) 69 69 }
+1 -1
cmd/relay/handlers_admin.go
··· 212 212 ctx := c.Request().Context() 213 213 214 214 limit := 10_000 215 - hosts, err := s.relay.ListHosts(ctx, 0, limit) 215 + hosts, err := s.relay.ListHosts(ctx, 0, limit, false) 216 216 if err != nil { 217 217 return err 218 218 }
+2 -1
cmd/relay/main.go
··· 105 105 Name: "default-account-limit", 106 106 Value: 100, 107 107 Usage: "max number of active accounts for new upstream hosts", 108 - EnvVars: []string{"RELAY_DEFAULT_ACCOUUNT_LIMIT", "RELAY_DEFAULT_REPO_LIMIT"}, 108 + EnvVars: []string{"RELAY_DEFAULT_ACCOUNT_LIMIT", "RELAY_DEFAULT_REPO_LIMIT"}, 109 109 }, 110 110 &cli.IntFlag{ 111 111 Name: "new-hosts-per-day-limit", ··· 148 148 &cli.StringSliceFlag{ 149 149 Name: "trusted-domains", 150 150 Usage: "domain names which mark trusted hosts; use wildcard prefix to match suffixes", 151 + Value: cli.NewStringSlice("*.host.bsky.network"), 151 152 EnvVars: []string{"RELAY_TRUSTED_DOMAINS"}, 152 153 }, 153 154 &cli.StringFlag{
+10 -3
cmd/relay/relay/host.go
··· 53 53 return &host, nil 54 54 } 55 55 56 - func (r *Relay) ListHosts(ctx context.Context, cursor int64, limit int) ([]*models.Host, error) { 56 + // If `everActive` flag is true, then only hosts which have ever had a message received (seq > 0) are returned. 57 + func (r *Relay) ListHosts(ctx context.Context, cursor int64, limit int, everActive bool) ([]*models.Host, error) { 58 + 59 + hosts := []*models.Host{} 57 60 58 61 // filters for accounts which have seen at least one event 59 62 // TODO: also filter based on status? 60 - hosts := []*models.Host{} 61 - if err := r.db.WithContext(ctx).Model(&models.Host{}).Where("id > ? AND last_seq > 0", cursor).Order("id").Limit(limit).Find(&hosts).Error; err != nil { 63 + clause := "id > ?" 64 + if everActive { 65 + clause = "id > ? AND last_seq > 0" 66 + } 67 + 68 + if err := r.db.WithContext(ctx).Model(&models.Host{}).Where(clause, cursor).Order("id").Limit(limit).Find(&hosts).Error; err != nil { 62 69 return nil, err 63 70 } 64 71 return hosts, nil
+1 -1
cmd/relay/relay/relay.go
··· 100 100 slurpConfig.PersistCursorCallback = r.PersistHostCursors 101 101 slurpConfig.PersistHostStatusCallback = r.UpdateHostStatus 102 102 103 - s, err := NewSlurper(r.processRepoEvent, slurpConfig, r.Logger) 103 + s, err := NewSlurper(r.processRepoEvent, slurpConfig) 104 104 if err != nil { 105 105 return nil, err 106 106 }
+3 -7
cmd/relay/relay/slurper.go
··· 125 125 } 126 126 } 127 127 128 - func NewSlurper(processCallback ProcessMessageFunc, config *SlurperConfig, logger *slog.Logger) (*Slurper, error) { 128 + func NewSlurper(processCallback ProcessMessageFunc, config *SlurperConfig) (*Slurper, error) { 129 129 if processCallback == nil { 130 130 return nil, fmt.Errorf("processCallback is required") 131 131 } 132 132 if config == nil { 133 133 config = DefaultSlurperConfig() 134 134 } 135 - if logger == nil { 136 - logger = slog.Default() 137 - } 138 135 139 - logger = logger.With("system", "slurper") 140 136 s := &Slurper{ 141 137 processCallback: processCallback, 142 138 Config: config, 143 139 subs: make(map[string]*Subscription), 144 140 shutdownChan: make(chan bool), 145 141 shutdownResult: make(chan error), 146 - logger: logger, 142 + logger: slog.Default().With("system", "slurper"), 147 143 } 148 144 149 145 // Start a goroutine to persist cursors (both periodically and and on shutdown) ··· 327 323 continue 328 324 } 329 325 330 - s.logger.Info("event subscription response", "code", res.StatusCode, "url", u) 326 + s.logger.Debug("event subscription response", "code", res.StatusCode, "url", u) 331 327 332 328 curCursor := cursor 333 329 if err := s.handleConnection(ctx, conn, &cursor, sub); err != nil {
+3
cmd/relay/service.go
··· 102 102 e.Use(svcutil.MetricsMiddleware) 103 103 104 104 e.HTTPErrorHandler = func(err error, c echo.Context) { 105 + if c.Response().Committed { 106 + return 107 + } 105 108 switch err := err.(type) { 106 109 case *echo.HTTPError: 107 110 if err2 := c.JSON(err.Code, map[string]any{
+5 -5
cmd/relay/stubs.go
··· 71 71 } 72 72 73 73 out, handleErr := s.handleComAtprotoSyncListHosts(c, cursor, limit) 74 - if handleErr != nil { 74 + if handleErr != nil || out == nil { 75 75 return handleErr 76 76 } 77 77 return c.JSON(200, out) ··· 84 84 hostnameQuery := c.QueryParam("hostname") 85 85 86 86 out, handleErr := s.handleComAtprotoSyncGetHostStatus(c, hostnameQuery) 87 - if handleErr != nil { 87 + if handleErr != nil || out == nil { 88 88 return handleErr 89 89 } 90 90 return c.JSON(200, out) ··· 116 116 } 117 117 118 118 out, handleErr := s.handleComAtprotoSyncListRepos(c, cursor, limit) 119 - if handleErr != nil { 119 + if handleErr != nil || out == nil { 120 120 return handleErr 121 121 } 122 122 return c.JSON(200, out) ··· 169 169 } 170 170 171 171 out, handleErr := s.handleComAtprotoSyncGetRepoStatus(c, did) 172 - if handleErr != nil { 172 + if handleErr != nil || out == nil { 173 173 return handleErr 174 174 } 175 175 return c.JSON(200, out) ··· 190 190 var handleErr error 191 191 // func (s *Service) handleComAtprotoSyncGetLatestCommit(ctx context.Context,did string) (*comatproto.SyncGetLatestCommit_Output, error) 192 192 out, handleErr = s.handleComAtprotoSyncGetLatestCommit(c, did) 193 - if handleErr != nil { 193 + if handleErr != nil || out == nil { 194 194 return handleErr 195 195 } 196 196 return c.JSON(200, out)
+4 -2
splitter/handlers.go
··· 51 51 52 52 // first forward to the upstream 53 53 xrpcc := xrpc.Client{ 54 - Client: s.upstreamClient, 55 - Host: s.conf.UpstreamHostHTTP(), 54 + Client: s.upstreamClient, 55 + Host: s.conf.UpstreamHostHTTP(), 56 + UserAgent: &s.conf.UserAgent, 56 57 } 57 58 58 59 err := comatproto.SyncRequestCrawl(ctx, &xrpcc, &body) ··· 122 123 upstreamReq.Header.Add(k, vals[0]) 123 124 } 124 125 } 126 + upstreamReq.Header.Add("User-Agent", s.conf.UserAgent) 125 127 126 128 upstreamResp, err := s.upstreamClient.Do(upstreamReq) 127 129 if err != nil {