this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

slurper: cleanup lastSeq tracking

+24 -46
+24 -46
cmd/relay/relay/slurper.go
··· 88 88 LastSeq atomic.Int64 89 89 Limiters *StreamLimiters 90 90 91 - lk sync.RWMutex 92 - ctx context.Context 93 - cancel func() 91 + scheduler *parallel.Scheduler 92 + lk sync.RWMutex 93 + ctx context.Context 94 + cancel func() 94 95 } 95 96 96 - func (sub *Subscription) UpdateSeq(seq int64) { 97 - sub.LastSeq.Store(seq) 97 + // pulls lastSeq from underlying scheduler in to this Subscription 98 + func (sub *Subscription) UpdateSeq() { 99 + sub.LastSeq.Store(sub.scheduler.LastSeq()) 98 100 } 99 101 100 102 func (sub *Subscription) HostCursor() HostCursor { ··· 291 293 HandshakeTimeout: time.Second * 5, 292 294 } 293 295 294 - // HACK: cursor by 200 events to smooth over unclean shutdowns. This has been in place since 2024. 295 - if host.LastSeq > 200 { 296 - host.LastSeq -= 200 297 - } else { 298 - host.LastSeq = 0 299 - } 300 - 301 296 cursor := host.LastSeq 302 297 303 298 connectedInbound.Inc() ··· 374 369 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 375 370 logger := s.logger.With("host", sub.Hostname, "did", evt.Repo, "seq", evt.Seq, "eventType", "commit") 376 371 logger.Debug("got remote repo event") 377 - if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoCommit: evt}, sub.Hostname, sub.HostID); err != nil { 372 + if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoCommit: evt}, sub.Hostname, sub.HostID); err != nil { 378 373 logger.Error("failed handling event", "err", err) 379 374 } 380 - *lastCursor = evt.Seq 381 - 382 - sub.UpdateSeq(*lastCursor) 375 + sub.UpdateSeq() 383 376 384 377 return nil 385 378 }, 386 379 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 387 380 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "sync") 388 - logger.Debug("got remote repo event") 389 - if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoSync: evt}, sub.Hostname, sub.HostID); err != nil { 381 + logger.Debug("commit event") 382 + if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoSync: evt}, sub.Hostname, sub.HostID); err != nil { 390 383 s.logger.Error("failed handling event", "err", err) 391 384 } 392 - *lastCursor = evt.Seq 393 - 394 - sub.UpdateSeq(*lastCursor) 385 + sub.UpdateSeq() 395 386 396 387 return nil 397 388 }, 398 389 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 399 390 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "identity") 400 391 logger.Debug("identity event") 401 - if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoIdentity: evt}, sub.Hostname, sub.HostID); err != nil { 392 + if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoIdentity: evt}, sub.Hostname, sub.HostID); err != nil { 402 393 logger.Error("failed handling event", "err", err) 403 394 } 404 - *lastCursor = evt.Seq 405 - 406 - sub.UpdateSeq(*lastCursor) 395 + sub.UpdateSeq() 407 396 408 397 return nil 409 398 }, 410 399 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 411 400 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "account") 412 401 s.logger.Debug("account event") 413 - if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoAccount: evt}, sub.Hostname, sub.HostID); err != nil { 402 + if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoAccount: evt}, sub.Hostname, sub.HostID); err != nil { 414 403 logger.Error("failed handling event", "err", err) 415 404 } 416 - *lastCursor = evt.Seq 417 - 418 - sub.UpdateSeq(*lastCursor) 405 + sub.UpdateSeq() 419 406 420 407 return nil 421 408 }, ··· 447 434 RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { // DEPRECATED 448 435 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "handle") 449 436 logger.Debug("got remote handle update event", "handle", evt.Handle) 450 - if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoHandle: evt}, sub.Hostname, sub.HostID); err != nil { 437 + if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoHandle: evt}, sub.Hostname, sub.HostID); err != nil { 451 438 logger.Error("failed handling event", "err", err) 452 439 } 453 - *lastCursor = evt.Seq 454 - 455 - sub.UpdateSeq(*lastCursor) 456 - 440 + sub.UpdateSeq() 457 441 return nil 458 442 }, 459 443 RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { // DEPRECATED 460 444 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "migrate") 461 445 logger.Debug("got remote repo migrate event", "migrateTo", evt.MigrateTo) 462 - if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoMigrate: evt}, sub.Hostname, sub.HostID); err != nil { 446 + if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoMigrate: evt}, sub.Hostname, sub.HostID); err != nil { 463 447 logger.Error("failed handling event", "err", err) 464 448 } 465 - *lastCursor = evt.Seq 466 - 467 - sub.UpdateSeq(*lastCursor) 468 - 449 + sub.UpdateSeq() 469 450 return nil 470 451 }, 471 452 RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { // DEPRECATED 472 453 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "tombstone") 473 454 logger.Debug("got remote repo tombstone event") 474 - if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoTombstone: evt}, sub.Hostname, sub.HostID); err != nil { 455 + if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoTombstone: evt}, sub.Hostname, sub.HostID); err != nil { 475 456 logger.Error("failed handling event", "err", err) 476 457 } 477 - *lastCursor = evt.Seq 478 - 479 - sub.UpdateSeq(*lastCursor) 480 - 458 + sub.UpdateSeq() 481 459 return nil 482 460 }, 483 461 } ··· 491 469 // NOTE: `InstrumentedRepoStreamCallbacks` is where event limiters get called/enforced 492 470 instrumentedRSC := stream.NewInstrumentedRepoStreamCallbacks(limiters, rsc.EventHandler) 493 471 494 - pool := parallel.NewScheduler( 472 + sub.scheduler = parallel.NewScheduler( 495 473 s.Config.ConcurrencyPerHost, 496 474 s.Config.QueueDepthPerHost, 497 475 conn.RemoteAddr().String(), 498 476 instrumentedRSC.EventHandler, 499 477 ) 500 478 connLogger := s.logger.With("host", sub.Hostname) 501 - return stream.HandleRepoStream(ctx, conn, pool, connLogger) 479 + return stream.HandleRepoStream(ctx, conn, sub.scheduler, connLogger) 502 480 } 503 481 504 482 type HostCursor struct {