this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

new relay: fix bugs with host cursor, lastSeq, and status tracking (#1068)

Ended up being a bigger batch of changes than I hoped, but did also
knock off some adjacent cleanups.

The fixes:

- the key thing is to update the cursor variable in the redial loop: the
lastSeq code had mostly been refactored, but missed this bit, where the
variable was getting updated via pointer from deep in other functions.
stop passing the pointer around, and instead use the `sub.UpdateSeq()` /
`sub.LastSeq` info to determine if cursor made progress before looping
- we were handling FutureCursor really aggressively: reseting the
database-persisted cursor in the relay. I think this is too aggressive
for automation; a host might restart in a weird state for a minute, then
come back later with the original seq (for example). Having relay just
disconnect for now
- if we see new valid events for a host, we now mark that host as
"active" (which prevents states like "offline" from being sticky even
after successful requestCrawl and updates)
- don't pass through `newHost`, just use the `lastSeq` as indicator of
whether to connect at "current cursor" or some previous cursor

authored by

bnewbold and committed by
GitHub
cf8f9ea6 b3f9d576

+58 -46
+3 -1
cmd/relay/HACKING.md
··· 15 15 - for a known host, the relay will attempt to reconnect (eg, after a drop or restart) at the last persisted sequence number. persisting should happen every few seconds, or at clean shutdown of the daemon, but it is possible for the cursor to be slightly out of sync, resulting in replay of messages 16 16 - account-level `#commit` revisions must always increase, and these revisions are stored for every valid `#commit` or `#sync` message from the account. repeated or lower revision messages are dropped. messages with revisions corresponding to a TID "in the future" (beyond a fudge period of a few minutes) are also dropped 17 17 - messages for an account (DID) which come from a host connection which are not the current PDS host for that account are dropped. If there is a mismatch, the relay will re-resolve the identity (DID document) and double-check before dropping the message, in case there was an account migration not reflected yet in local caches. 18 - - if a host sends no messages for a long period, the relay will drop the connection and set the host status to "idle"; this is common for low-traffic PDS instances (eg, handful of accounts). The expectation is that the host would then send a `requestCrawl` ping next time there is a new event. 18 + - (NOTE: this is not implemented yet!) if a host sends no messages for a long period, the relay should drop the connection and set the host status to "idle"; this is common for low-traffic PDS instances (eg, handful of accounts). The expectation is that the host would then send a `requestCrawl` ping next time there is a new event. 19 19 - when the relay restarts, it connects to all "active" hosts 20 20 - if configured with "sibling" relay instances, will forward `requestCrawl` and some administrative requests to each of those instances. The use-case is to keep a cluster of independent relays relatively synchronized in terms of hosts subscribed, takedowns, and quotas. Requests are only forwarded if processed successfully on the current instance. `User-Agent` is passed through from original request, but the `Via` header is set, and used to prevent forwarding loops. Auth headers are passed through; admin forwarding only works if the same secret works for all sibling relays. API requests forwarded to a remote rainbow instance (in front of a relay), should get proxied through to that relay successfully. 21 21 - both the relay and rainbow set a `Server` header in HTTP responses (including WebSocket connections), and the relay checks for this header when connecting. If it finds the string `atproto-relay` in the header, it refuses the connection, to prevent relay request loops. This is just a conservative default behavior; relays consuming from other relays is allowed by protocol. 22 22 - when connecting to remote hosts, including WebSocket subscriptions, the relay includes basic SSRF protections against connecting to private, reserved, or local IP addresses; or ports other than 80 or 443. This check is skipped if the remote host is specifically localhost (with an explicit port). If needed this constraint could be made configurable. 23 + - when connecting to a host (PDS), if the cursor was "in the future", the PDS will return an error frame and drop the connection. In this situation the relay will drop the connection and not automatically reconnect. 23 24 24 25 25 26 ## Internal Implementation Details ··· 27 28 - the parallel event scheduler prevents multiple tasks for the same account (DID) from being processed at the same time 28 29 - note the potential for race-conditions with messages about the same account (DID) coming from different hosts around the same time: in this case there is no guarantee about ordering 29 30 - the relay keeps track of which events have been received-but-not-processed by sequence number, and only increments the `lastSeq` for actually-processed events. the "inflight" set of messages (sequence numbers) can grow rather large for active hosts, if there are many events for a single account (only one processed per account at a time) 31 + - the parallel scheduler keeps track of which events have been successfully processed. the slurper event processing code updates the cached sequence after relay processing is done, but this happens within the scheduler work scope, which means when it asks the scheduler what the highest seq is, it will never say the *current* event has been processed. this means lastSeq needs to be pulled periodically, or else is slightly out of date. 30 32 31 33 32 34 ## Code Organization and History
+2 -4
cmd/relay/relay/crawl.go
··· 15 15 } 16 16 17 17 // fetch host info from database. this query will not error if host does not yet exist 18 - newHost := false 19 18 var host models.Host 20 19 if err := r.db.WithContext(ctx).Find(&host, "hostname = ?", hostname).Error; err != nil { 21 20 return err 22 21 } 23 22 24 23 if host.ID == 0 { 25 - newHost = true 26 24 27 25 // check if we're over the limit for new hosts today (bypass if admin mode) 28 26 if !adminForce && !r.HostPerDayLimiter.Allow() { ··· 53 51 return fmt.Errorf("cannot subscribe to banned pds") 54 52 } 55 53 56 - return r.Slurper.Subscribe(&host, newHost) 54 + return r.Slurper.Subscribe(&host) 57 55 } 58 56 59 57 // This function expects to be run when starting up, to re-connect to known active hosts ··· 69 67 logger.Info("re-subscribing to active host") 70 68 // make a copy of host 71 69 host := host 72 - err := r.Slurper.Subscribe(&host, false) 70 + err := r.Slurper.Subscribe(&host) 73 71 if err != nil { 74 72 logger.Warn("failed to re-subscribe to host", "err", err) 75 73 }
+2 -2
cmd/relay/relay/host.go
··· 121 121 return nil 122 122 } 123 123 124 - // Persists all the host cursors in a single database transaction 124 + // Persists all the host cursors in a single database transaction. Also updates status to "active" for hosts which have a positive cursor. 125 125 // 126 126 // Note that in some situations this may have partial success. 127 127 func (r *Relay) PersistHostCursors(ctx context.Context, cursors *[]HostCursor) error { ··· 130 130 if cur.LastSeq <= 0 { 131 131 continue 132 132 } 133 - if err := tx.WithContext(ctx).Model(models.Host{}).Where("id = ?", cur.HostID).UpdateColumn("last_seq", cur.LastSeq).Error; err != nil { 133 + if err := tx.WithContext(ctx).Model(models.Host{}).Where("id = ?", cur.HostID).UpdateColumn("last_seq", cur.LastSeq).UpdateColumn("status", models.HostStatusActive).Error; err != nil { 134 134 r.Logger.Error("failed to persist host cursor", "hostID", cur.HostID, "lastSeq", cur.LastSeq) 135 135 } 136 136 }
+51 -39
cmd/relay/relay/slurper.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "errors" 5 6 "fmt" 6 7 "log/slog" 7 8 "math/rand" ··· 20 21 "github.com/RussellLuo/slidingwindow" 21 22 "github.com/gorilla/websocket" 22 23 ) 24 + 25 + var ErrFutureCursor = errors.New("host rejected future cursor") 23 26 24 27 type ProcessMessageFunc func(ctx context.Context, evt *stream.XRPCStreamEvent, hostname string, hostID uint64) error 25 28 type PersistCursorFunc func(ctx context.Context, cursors *[]HostCursor) error ··· 95 98 96 99 // pulls lastSeq from underlying scheduler in to this Subscription 97 100 func (sub *Subscription) UpdateSeq() { 98 - sub.LastSeq.Store(sub.scheduler.LastSeq()) 101 + // possible for this to get called before a connection has fully been set up 102 + if sub.scheduler == nil { 103 + return 104 + } 105 + seq := sub.scheduler.LastSeq() 106 + if seq > 0 { 107 + sub.LastSeq.Store(seq) 108 + } 99 109 } 100 110 101 111 func (sub *Subscription) HostCursor() HostCursor { 102 - sub.lk.Lock() 103 - defer sub.lk.Unlock() 104 112 return HostCursor{ 105 113 HostID: sub.HostID, 106 114 LastSeq: sub.LastSeq.Load(), ··· 237 245 // high-level entry point for opening a subscription (websocket connection). This might be called when adding a new host, or when re-connecting to a previously subscribed host. 238 246 // 239 247 // NOTE: the `host` parameter (a database row) contains metadata about the host at a point in time. Subsequent changes to the database aren't reflected in that struct, and changes to the struct don't get persisted to database. 240 - func (s *Slurper) Subscribe(host *models.Host, newHost bool) error { 241 - // TODO: replace newHost with a check for negative number on host.LastSeq (via IsNewHost helper method on `models.Host`?) 248 + func (s *Slurper) Subscribe(host *models.Host) error { 242 249 s.subsLk.Lock() 243 250 defer s.subsLk.Unlock() 244 251 ··· 268 275 sub.LastSeq.Store(host.LastSeq) 269 276 s.subs[host.Hostname] = &sub 270 277 271 - go s.subscribeWithRedialer(ctx, host, &sub, newHost) 278 + go s.subscribeWithRedialer(ctx, host, &sub) 272 279 273 280 return nil 274 281 } ··· 276 283 // Main event-loop for a subscription (websocket connection to upstream host), expected to be called as a goroutine. 277 284 // 278 285 // On connection failure (drop or failed initial connection), will attempt re-connects, with backoff. 279 - func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.Host, sub *Subscription, newHost bool) { 286 + func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.Host, sub *Subscription) { 287 + 288 + logger := s.logger.With("host", host.Hostname) 280 289 defer func() { 281 290 s.subsLk.Lock() 282 291 defer s.subsLk.Unlock() ··· 309 318 } 310 319 311 320 u := host.SubscribeReposURL() 312 - if !newHost && cursor > 0 { 321 + if cursor > 0 { 313 322 u = fmt.Sprintf("%s?cursor=%d", u, cursor) 314 323 } 315 324 hdr := make(http.Header) 316 325 hdr.Add("User-Agent", s.Config.UserAgent) 317 326 conn, resp, err := d.DialContext(ctx, u, hdr) 318 327 if err != nil { 319 - s.logger.Warn("dialing failed", "host", host.Hostname, "err", err, "backoff", backoff) 328 + logger.Warn("dialing failed", "err", err, "backoff", backoff) 320 329 time.Sleep(sleepForBackoff(backoff)) 321 330 backoff++ 322 331 323 332 if backoff > 15 { 324 - s.logger.Warn("host does not appear to be online, disabling for now", "host", host.Hostname) 333 + logger.Warn("host does not appear to be online, disabling for now") 325 334 if err := s.Config.PersistHostStatusCallback(ctx, sub.HostID, models.HostStatusOffline); err != nil { 326 - s.logger.Error("failed mark host as stale", "hostname", sub.Hostname, "err", err) 335 + logger.Error("failed to update host status", "err", err) 327 336 } 328 337 return 329 338 } ··· 334 343 // check if we connected to a relay (eg, this indigo relay, or rainbow) and drop if so 335 344 serverHdr := resp.Header.Get("Server") 336 345 if strings.Contains(serverHdr, "atproto-relay") { 337 - s.logger.Warn("subscribed host is atproto relay of some kind, banning", "server", serverHdr, "url", u, "hostname", sub.Hostname) 346 + logger.Warn("subscribed host is atproto relay of some kind, banning", "header", "Server", "value", serverHdr, "url", u) 338 347 if err := s.Config.PersistHostStatusCallback(ctx, sub.HostID, models.HostStatusBanned); err != nil { 339 - s.logger.Error("failed mark host as banned", "hostname", sub.Hostname, "err", err) 348 + logger.Error("failed to update host status", "err", err) 340 349 } 350 + return 341 351 } 342 352 343 - s.logger.Debug("event subscription response", "code", resp.StatusCode, "url", u) 353 + logger.Debug("event subscription response", "code", resp.StatusCode, "url", u) 354 + 355 + if err := s.handleConnection(ctx, conn, sub); err != nil { 344 356 345 - curCursor := cursor 346 - if err := s.handleConnection(ctx, conn, &cursor, sub); err != nil { 347 - s.logger.Warn("connection to failed", "host", host.Hostname, "err", err) 348 357 // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl 358 + logger.Warn("host connection failed", "err", err, "backoff", backoff) 359 + 360 + // for all other errors, keep retrying / reconnecting 349 361 } 350 362 351 - if cursor > curCursor { 363 + updatedCursor := sub.LastSeq.Load() 364 + if updatedCursor > cursor { 365 + // did we make any progress? 366 + cursor = updatedCursor 352 367 backoff = 0 368 + 369 + // persist updated cursor 370 + if s.Config.PersistCursorCallback != nil { 371 + batch := []HostCursor{sub.HostCursor()} 372 + if err := s.Config.PersistCursorCallback(ctx, &batch); err != nil { 373 + logger.Warn("failed to persist cursor") 374 + } 375 + } 353 376 } 354 377 } 355 378 } ··· 367 390 } 368 391 369 392 // Configures event processing for a websocket connection, using the parallel schedule helper library, with all events processed using the configured callback function. 370 - func (s *Slurper) handleConnection(ctx context.Context, conn *websocket.Conn, lastCursor *int64, sub *Subscription) error { 393 + func (s *Slurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *Subscription) error { 371 394 ctx, cancel := context.WithCancel(ctx) 372 395 defer cancel() 373 396 ··· 413 436 return nil 414 437 }, 415 438 Error: func(evt *stream.ErrorFrame) error { 416 - s.logger.Warn("error event from upstream", "name", evt.Error, "message", evt.Message, "host", sub.Hostname) 439 + logger := s.logger.With("host", sub.Hostname) 440 + logger.Warn("error event from upstream", "name", evt.Error, "message", evt.Message) 417 441 switch evt.Error { 418 442 case "FutureCursor": 419 - // TODO: need test coverage for this code path (including re-connect) 420 - // if we get a FutureCursor frame, reset our sequence number for this host 421 - if s.Config.PersistCursorCallback != nil { 422 - hc := []HostCursor{HostCursor{ 423 - HostID: sub.HostID, 424 - LastSeq: -1, // -1 will result in "current stream" on reconnect 425 - }} 426 - if err := s.Config.PersistCursorCallback(context.Background(), &hc); err != nil { 427 - s.logger.Error("failed to reset cursor for host which sent FutureCursor error message", "host", sub.Hostname, "err", err) 428 - } 429 - } else { 430 - s.logger.Warn("skipping FutureCursor fix because PersistCursorCallback registered", "host", sub.Hostname) 443 + if err := s.Config.PersistHostStatusCallback(ctx, sub.HostID, models.HostStatusIdle); err != nil { 444 + logger.Error("failed updating host status due to future cursor", "err", err) 431 445 } 432 - *lastCursor = 0 433 - return fmt.Errorf("got FutureCursor error") 446 + logger.Warn("dropping connection to host due to future cursor") 447 + sub.cancel() 448 + return ErrFutureCursor 434 449 default: 435 450 return fmt.Errorf("error frame: %s: %s", evt.Error, evt.Message) 436 451 } ··· 505 520 cursors := make([]HostCursor, len(s.subs)) 506 521 i := 0 507 522 for _, sub := range s.subs { 508 - cursors[i] = HostCursor{ 509 - HostID: sub.HostID, 510 - LastSeq: sub.LastSeq.Load(), 511 - } 523 + sub.UpdateSeq() 524 + cursors[i] = sub.HostCursor() 512 525 i++ 513 526 } 514 527 s.subsLk.Unlock() ··· 539 552 return fmt.Errorf("killing connection %q: %w", hostname, ErrHostInactive) 540 553 } 541 554 sub.cancel() 542 - // cleanup in the run thread subscribeWithRedialer() will delete(s.active, host) 543 555 544 556 if ban && s.Config.PersistHostStatusCallback != nil { 545 557 if err := s.Config.PersistHostStatusCallback(ctx, sub.HostID, models.HostStatusBanned); err != nil {