this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

finish removing database from slurper

+151 -163
+2 -29
cmd/rerelay/main.go
··· 21 21 "github.com/bluesky-social/indigo/cmd/rerelay/relay" 22 22 "github.com/bluesky-social/indigo/cmd/rerelay/stream/eventmgr" 23 23 "github.com/bluesky-social/indigo/cmd/rerelay/stream/persist/diskpersist" 24 - "github.com/bluesky-social/indigo/util" 25 24 "github.com/bluesky-social/indigo/util/cliutil" 26 - "github.com/bluesky-social/indigo/xrpc" 27 25 28 26 "github.com/carlmjohnson/versioninfo" 29 27 "github.com/urfave/cli/v2" ··· 105 103 Usage: "number of concurrent worker routines per upstream host", 106 104 EnvVars: []string{"RELAY_HOST_CONCURRENCY", "RELAY_CONCURRENCY_PER_PDS"}, 107 105 Value: 100, 108 - }, 109 - &cli.IntFlag{ 110 - Name: "max-queue-per-host", 111 - Value: 1_000, 112 - Usage: "size of in-process DID (identity) cache", 113 - EnvVars: []string{"RELAY_MAX_QUEUE_PER_HOST", "RELAY_MAX_QUEUE_PER_PDS"}, 114 106 }, 115 107 &cli.IntFlag{ 116 108 Name: "default-account-limit", ··· 230 222 relayConfig := relay.DefaultRelayConfig() 231 223 relayConfig.SSL = !cctx.Bool("crawl-insecure-ws") 232 224 relayConfig.ConcurrencyPerHost = cctx.Int64("host-concurrency") 233 - relayConfig.MaxQueuePerHost = cctx.Int64("max-queue-per-host") 234 225 relayConfig.DefaultRepoLimit = cctx.Int64("default-account-limit") 235 226 ratelimitBypass := cctx.String("bsky-social-rate-limit-skip") 236 - relayConfig.ApplyHostClientSettings = makePdsClientSetup(ratelimitBypass) 227 + // TODO: actually use ratelimitBypass for host checks? 228 + _ = ratelimitBypass 237 229 nextCrawlers := cctx.StringSlice("forward-crawl-requests") 238 230 if len(nextCrawlers) > 0 { 239 231 nextCrawlerUrls := make([]*url.URL, len(nextCrawlers)) ··· 317 309 318 310 return nil 319 311 } 320 - 321 - func makePdsClientSetup(ratelimitBypass string) func(c *xrpc.Client) { 322 - return func(c *xrpc.Client) { 323 - if c.Client == nil { 324 - c.Client = util.RobustHTTPClient() 325 - } 326 - if strings.HasSuffix(c.Host, ".bsky.network") { 327 - c.Client.Timeout = time.Minute * 30 328 - if ratelimitBypass != "" { 329 - c.Headers = map[string]string{ 330 - "x-ratelimit-bypass": ratelimitBypass, 331 - } 332 - } 333 - } else { 334 - // Generic host timeout 335 - c.Client.Timeout = time.Minute * 1 336 - } 337 - } 338 - }
+19
cmd/rerelay/relay/host.go
··· 13 13 "gorm.io/gorm" 14 14 ) 15 15 16 + // XXX: GetHost (by hostname) vs GetHostByID 17 + 16 18 func (r *Relay) GetHost(ctx context.Context, hostID uint64) (*models.Host, error) { 17 19 ctx, span := tracer.Start(ctx, "getHost") 18 20 defer span.End() ··· 31 33 } 32 34 33 35 return &host, nil 36 + } 37 + 38 + func (r *Relay) UpdateHostStatus(ctx context.Context, hostID uint64, status models.HostStatus) error { 39 + return r.db.Model(models.Host{}).Where("id = ?", hostID).Update("status", status).Error 40 + } 41 + 42 + // Persists all the host cursors in a single database transaction 43 + // 44 + // Note that in some situations this may have partial success. 45 + func (r *Relay) PersistHostCursors(ctx context.Context, cursors *[]HostCursor) error { 46 + tx := r.db.WithContext(ctx).Begin() 47 + for _, cur := range *cursors { 48 + if err := tx.WithContext(ctx).Model(models.Host{}).Where("id = ?", cur.HostID).UpdateColumn("last_seq", cur.LastSeq).Error; err != nil { 49 + r.Logger.Error("failed to persist host cursor", "hostID", cur.HostID, "lastSeq", cur.LastSeq) 50 + } 51 + } 52 + return tx.WithContext(ctx).Commit().Error 34 53 } 35 54 36 55 // parses, normalizes, and validates a raw URL (HTTP or WebSocket) in to a hostname for subscriptions
+4 -3
cmd/rerelay/relay/models/models.go
··· 15 15 type HostStatus string 16 16 17 17 const ( 18 - HostStatusActive = HostStatus("active") 19 - HostStatusIdle = HostStatus("idle") 20 - HostStatusBanned = HostStatus("banned") 18 + HostStatusActive = HostStatus("active") 19 + HostStatusIdle = HostStatus("idle") 20 + HostStatusOffline = HostStatus("offline") 21 + HostStatusBanned = HostStatus("banned") 21 22 ) 22 23 23 24 type Host struct {
+14 -15
cmd/rerelay/relay/relay.go
··· 7 7 "github.com/bluesky-social/indigo/atproto/identity" 8 8 "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 9 9 "github.com/bluesky-social/indigo/cmd/rerelay/stream/eventmgr" 10 - "github.com/bluesky-social/indigo/xrpc" 11 10 12 11 "github.com/hashicorp/golang-lru/v2" 13 12 "go.opentelemetry.io/otel" ··· 40 39 } 41 40 42 41 type RelayConfig struct { 43 - SSL bool 44 - DefaultRepoLimit int64 45 - ConcurrencyPerHost int64 46 - MaxQueuePerHost int64 47 - ApplyHostClientSettings func(c *xrpc.Client) 48 - SkipAccountHostCheck bool // XXX: only used for testing 49 - LenientSyncValidation bool // XXX: wire through config 42 + SSL bool 43 + DefaultRepoLimit int64 44 + ConcurrencyPerHost int64 45 + SkipAccountHostCheck bool // XXX: only used for testing 46 + LenientSyncValidation bool // XXX: wire through config 50 47 51 48 // if true, ignore "requestCrawl" 52 49 DisableNewHosts bool ··· 58 55 SSL: true, 59 56 DefaultRepoLimit: 100, 60 57 ConcurrencyPerHost: 100, 61 - MaxQueuePerHost: 1_000, 62 58 } 63 59 } 64 60 ··· 90 86 return nil, err 91 87 } 92 88 93 - slOpts := DefaultSlurperConfig() 94 - slOpts.SSL = config.SSL 95 - slOpts.DefaultRepoLimit = config.DefaultRepoLimit 96 - slOpts.ConcurrencyPerHost = config.ConcurrencyPerHost 97 - slOpts.MaxQueuePerHost = config.MaxQueuePerHost 98 - s, err := NewSlurper(db, r.processRepoEvent, slOpts, r.Logger) 89 + // XXX: need to pass-through more relay configs 90 + slurpConfig := DefaultSlurperConfig() 91 + slurpConfig.SSL = config.SSL 92 + slurpConfig.DefaultRepoLimit = config.DefaultRepoLimit 93 + slurpConfig.ConcurrencyPerHost = config.ConcurrencyPerHost 94 + // register callbacks to persist cursors and host state in database 95 + slurpConfig.PersistCursorCallback = r.PersistHostCursors 96 + slurpConfig.PersistHostStatusCallback = r.UpdateHostStatus 97 + s, err := NewSlurper(r.processRepoEvent, slurpConfig, r.Logger) 99 98 if err != nil { 100 99 return nil, err 101 100 }
+107 -115
cmd/rerelay/relay/slurper.go
··· 16 16 17 17 "github.com/RussellLuo/slidingwindow" 18 18 "github.com/gorilla/websocket" 19 - "gorm.io/gorm" 20 19 ) 21 20 22 21 type ProcessMessageFunc func(ctx context.Context, evt *stream.XRPCStreamEvent, hostname string, hostID uint64) error 22 + type PersistCursorFunc func(ctx context.Context, cursors *[]HostCursor) error 23 + type PersistHostStatusFunc func(ctx context.Context, hostID uint64, state models.HostStatus) error 23 24 25 + // `Slurper` is the sub-system of the relay which manages active websocket firehose connections to upstream hosts. 26 + // 27 + // It enforces rate-limits, tracks cursors, and retries connections. It passes recieved messages on to the main relay via a callback function. `Slurper` does not talk to the database directly, but does have some callback to persist host state (cursors and hosting status for some error conditions). 24 28 type Slurper struct { 25 - cb ProcessMessageFunc 26 - db *gorm.DB 27 - Config *SlurperConfig 29 + processCallback ProcessMessageFunc 30 + Config *SlurperConfig 28 31 29 - lk sync.Mutex 30 - active map[string]*Subscription 32 + subsLk sync.Mutex 33 + subs map[string]*Subscription 31 34 32 35 LimitMtx sync.RWMutex 33 36 Limiters map[uint64]*Limiters ··· 35 38 NewHostPerDayLimiter *slidingwindow.Limiter 36 39 37 40 shutdownChan chan bool 38 - shutdownResult chan []error 41 + shutdownResult chan error 39 42 40 43 logger *slog.Logger 41 44 } ··· 47 50 } 48 51 49 52 type SlurperConfig struct { 50 - SSL bool 51 - DefaultPerSecondLimit int64 52 - DefaultPerHourLimit int64 53 - DefaultPerDayLimit int64 54 - DefaultRepoLimit int64 55 - ConcurrencyPerHost int64 56 - MaxQueuePerHost int64 57 - NewHostPerDayLimit int64 53 + SSL bool 54 + DefaultPerSecondLimit int64 55 + DefaultPerHourLimit int64 56 + DefaultPerDayLimit int64 57 + DefaultRepoLimit int64 58 + ConcurrencyPerHost int64 59 + NewHostPerDayLimit int64 60 + PersistCursorPeriod time.Duration 61 + PersistCursorCallback PersistCursorFunc 62 + PersistHostStatusCallback PersistHostStatusFunc 58 63 } 59 64 60 65 func DefaultSlurperConfig() *SlurperConfig { ··· 64 69 DefaultPerHourLimit: 2500, 65 70 DefaultPerDayLimit: 20_000, 66 71 DefaultRepoLimit: 100, 67 - ConcurrencyPerHost: 100, 68 - MaxQueuePerHost: 1_000, 72 + ConcurrencyPerHost: 40, 73 + PersistCursorPeriod: time.Second * 10, 69 74 } 70 75 } 71 76 ··· 87 92 sub.LastSeq = seq 88 93 } 89 94 90 - func NewSlurper(db *gorm.DB, cb ProcessMessageFunc, config *SlurperConfig, logger *slog.Logger) (*Slurper, error) { 95 + func (sub *Subscription) HostCursor() HostCursor { 96 + sub.lk.Lock() 97 + defer sub.lk.Unlock() 98 + return HostCursor{ 99 + HostID: sub.HostID, 100 + LastSeq: sub.LastSeq, 101 + } 102 + } 103 + 104 + func NewSlurper(processCallback ProcessMessageFunc, config *SlurperConfig, logger *slog.Logger) (*Slurper, error) { 91 105 if config == nil { 92 106 config = DefaultSlurperConfig() 93 107 } ··· 99 113 newHostPerDayLimiter, _ := slidingwindow.NewLimiter(time.Hour*24, config.NewHostPerDayLimit, windowFunc) 100 114 101 115 s := &Slurper{ 102 - cb: cb, 103 - db: db, 116 + processCallback: processCallback, 104 117 Config: config, 105 - active: make(map[string]*Subscription), 118 + subs: make(map[string]*Subscription), 106 119 Limiters: make(map[uint64]*Limiters), 107 120 shutdownChan: make(chan bool), 108 - shutdownResult: make(chan []error), 121 + shutdownResult: make(chan error), 109 122 NewHostPerDayLimiter: newHostPerDayLimiter, 110 123 logger: logger, 111 124 } 112 125 113 - // Start a goroutine to flush cursors to the DB every 30s 126 + // Start a goroutine to persist cursors (both periodically and and on shutdown) 114 127 go func() { 115 128 for { 116 129 select { 117 130 case <-s.shutdownChan: 118 - s.logger.Info("flushing Host cursors on shutdown") 119 - ctx := context.Background() 120 - var errs []error 121 - if errs = s.flushCursors(ctx); len(errs) > 0 { 122 - for _, err := range errs { 123 - s.logger.Error("failed to flush cursors on shutdown", "err", err) 124 - } 125 - } 126 - s.logger.Info("done flushing Host cursors on shutdown") 127 - s.shutdownResult <- errs 131 + s.logger.Info("starting shutdown host cursor flush") 132 + s.shutdownResult <- s.persistCursors(context.Background()) 128 133 return 129 - case <-time.After(time.Second * 10): 130 - s.logger.Debug("flushing Host cursors") 131 - ctx := context.Background() 132 - if errs := s.flushCursors(ctx); len(errs) > 0 { 133 - for _, err := range errs { 134 - s.logger.Error("failed to flush cursors", "err", err) 135 - } 134 + case <-time.After(config.PersistCursorPeriod): 135 + if err := s.persistCursors(context.Background()); err != nil { 136 + s.logger.Error("failed to flush cursors", "err", err) 136 137 } 137 - s.logger.Debug("done flushing Host cursors") 138 138 } 139 139 } 140 140 }() ··· 215 215 } 216 216 217 217 // Shutdown shuts down the slurper 218 - func (s *Slurper) Shutdown() []error { 218 + func (s *Slurper) Shutdown() error { 219 219 s.shutdownChan <- true 220 220 s.logger.Info("waiting for slurper shutdown") 221 - errs := <-s.shutdownResult 222 - if len(errs) > 0 { 223 - for _, err := range errs { 224 - s.logger.Error("shutdown error", "err", err) 225 - } 221 + err := <-s.shutdownResult 222 + if err != nil { 223 + s.logger.Error("shutdown error", "err", err) 226 224 } 227 225 s.logger.Info("slurper shutdown complete") 228 - return errs 226 + return err 229 227 } 230 228 231 229 func (s *Slurper) CheckIfSubscribed(hostname string) bool { 232 - s.lk.Lock() 233 - defer s.lk.Unlock() 230 + s.subsLk.Lock() 231 + defer s.subsLk.Unlock() 234 232 235 - _, ok := s.active[hostname] 233 + _, ok := s.subs[hostname] 236 234 return ok 237 235 } 238 236 239 237 func (s *Slurper) Subscribe(host *models.Host, newHost bool) error { 240 238 // TODO: for performance, lock on the hostname instead of global 241 - s.lk.Lock() 242 - defer s.lk.Unlock() 239 + s.subsLk.Lock() 240 + defer s.subsLk.Unlock() 243 241 244 242 ctx, cancel := context.WithCancel(context.Background()) 245 243 sub := Subscription{ ··· 248 246 ctx: ctx, 249 247 cancel: cancel, 250 248 } 251 - s.active[host.Hostname] = &sub 249 + s.subs[host.Hostname] = &sub 252 250 253 251 s.GetOrCreateLimiters(host.ID) 254 252 ··· 259 257 260 258 func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.Host, sub *Subscription, newHost bool) { 261 259 defer func() { 262 - s.lk.Lock() 263 - defer s.lk.Unlock() 260 + s.subsLk.Lock() 261 + defer s.subsLk.Unlock() 264 262 265 - delete(s.active, host.Hostname) 263 + delete(s.subs, host.Hostname) 266 264 }() 267 265 268 266 d := websocket.Dialer{ ··· 302 300 303 301 if backoff > 15 { 304 302 s.logger.Warn("host does not appear to be online, disabling for now", "host", host.Hostname) 305 - if err := s.db.Model(&models.Host{}).Where("id = ?", host.ID).Update("registered", false).Error; err != nil { 306 - s.logger.Error("failed to unregister failing host", "err", err) 303 + if err := s.Config.PersistHostStatusCallback(ctx, sub.HostID, models.HostStatusOffline); err != nil { 304 + s.logger.Error("failed mark host as stale", "hostname", sub.Hostname, "err", err) 307 305 } 308 - 309 306 return 310 307 } 311 308 ··· 352 349 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 353 350 logger := s.logger.With("host", sub.Hostname, "did", evt.Repo, "seq", evt.Seq, "eventType", "commit") 354 351 logger.Debug("got remote repo event") 355 - if err := s.cb(context.TODO(), &stream.XRPCStreamEvent{RepoCommit: evt}, sub.Hostname, sub.HostID); err != nil { 352 + if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoCommit: evt}, sub.Hostname, sub.HostID); err != nil { 356 353 logger.Error("failed handling event", "err", err) 357 354 } 358 355 *lastCursor = evt.Seq ··· 364 361 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 365 362 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "sync") 366 363 logger.Debug("got remote repo event") 367 - if err := s.cb(context.TODO(), &stream.XRPCStreamEvent{RepoSync: evt}, sub.Hostname, sub.HostID); err != nil { 364 + if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoSync: evt}, sub.Hostname, sub.HostID); err != nil { 368 365 s.logger.Error("failed handling event", "err", err) 369 366 } 370 367 *lastCursor = evt.Seq ··· 376 373 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 377 374 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "identity") 378 375 logger.Debug("identity event") 379 - if err := s.cb(context.TODO(), &stream.XRPCStreamEvent{RepoIdentity: evt}, sub.Hostname, sub.HostID); err != nil { 376 + if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoIdentity: evt}, sub.Hostname, sub.HostID); err != nil { 380 377 logger.Error("failed handling event", "err", err) 381 378 } 382 379 *lastCursor = evt.Seq ··· 388 385 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 389 386 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "account") 390 387 s.logger.Debug("account event") 391 - if err := s.cb(context.TODO(), &stream.XRPCStreamEvent{RepoAccount: evt}, sub.Hostname, sub.HostID); err != nil { 388 + if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoAccount: evt}, sub.Hostname, sub.HostID); err != nil { 392 389 logger.Error("failed handling event", "err", err) 393 390 } 394 391 *lastCursor = evt.Seq ··· 398 395 return nil 399 396 }, 400 397 Error: func(evt *stream.ErrorFrame) error { 398 + // TODO: verbose logging 401 399 switch evt.Error { 402 400 case "FutureCursor": 403 - // XXX: need test coverage for this path 401 + // TODO: need test coverage for this code path (including re-connect) 404 402 // if we get a FutureCursor frame, reset our sequence number for this host 405 - if err := s.db.Table("host").Where("id = ?", sub.HostID).Update("last_seq", 0).Error; err != nil { 406 - return err 403 + if s.Config.PersistCursorCallback != nil { 404 + hc := []HostCursor{sub.HostCursor()} 405 + if err := s.Config.PersistCursorCallback(context.Background(), &hc); err != nil { 406 + s.logger.Error("failed to reset cursor for host which sent FutureCursor error message", "hostname", sub.Hostname, "err", err) 407 + } 408 + } else { 409 + s.logger.Warn("skipping FutureCursor fix because PersistCursorCallback registered", "hostname", sub.Hostname) 407 410 } 408 - 409 411 *lastCursor = 0 412 + // TODO: should this really return an error? 410 413 return fmt.Errorf("got FutureCursor frame, reset cursor tracking for host") 411 414 default: 412 415 return fmt.Errorf("error frame: %s: %s", evt.Error, evt.Message) ··· 419 422 RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { // DEPRECATED 420 423 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "handle") 421 424 logger.Debug("got remote handle update event", "handle", evt.Handle) 422 - if err := s.cb(context.TODO(), &stream.XRPCStreamEvent{RepoHandle: evt}, sub.Hostname, sub.HostID); err != nil { 425 + if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoHandle: evt}, sub.Hostname, sub.HostID); err != nil { 423 426 logger.Error("failed handling event", "err", err) 424 427 } 425 428 *lastCursor = evt.Seq ··· 431 434 RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { // DEPRECATED 432 435 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "migrate") 433 436 logger.Debug("got remote repo migrate event", "migrateTo", evt.MigrateTo) 434 - if err := s.cb(context.TODO(), &stream.XRPCStreamEvent{RepoMigrate: evt}, sub.Hostname, sub.HostID); err != nil { 437 + if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoMigrate: evt}, sub.Hostname, sub.HostID); err != nil { 435 438 logger.Error("failed handling event", "err", err) 436 439 } 437 440 *lastCursor = evt.Seq ··· 443 446 RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { // DEPRECATED 444 447 logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "tombstone") 445 448 logger.Debug("got remote repo tombstone event") 446 - if err := s.cb(context.TODO(), &stream.XRPCStreamEvent{RepoTombstone: evt}, sub.Hostname, sub.HostID); err != nil { 449 + if err := s.processCallback(context.TODO(), &stream.XRPCStreamEvent{RepoTombstone: evt}, sub.Hostname, sub.HostID); err != nil { 447 450 logger.Error("failed handling event", "err", err) 448 451 } 449 452 *lastCursor = evt.Seq ··· 473 476 return stream.HandleRepoStream(ctx, conn, pool, nil) 474 477 } 475 478 476 - type cursorSnapshot struct { 477 - hostID uint64 478 - cursor int64 479 + type HostCursor struct { 480 + HostID uint64 481 + LastSeq int64 479 482 } 480 483 481 - // flushCursors updates the Host cursors in the DB for all active subscriptions 482 - func (s *Slurper) flushCursors(ctx context.Context) []error { 484 + // persistCursors sends all cursors to callback to be persisted in database (if registered) 485 + func (s *Slurper) persistCursors(ctx context.Context) error { 486 + if s.Config.PersistCursorCallback != nil { 487 + s.logger.Warn("skipping cursor persist because no PersistCursorCallback registered") 488 + return nil 489 + } 483 490 start := time.Now() 484 - //ctx, span := otel.Tracer("feedmgr").Start(ctx, "flushCursors") 485 - //defer span.End() 486 491 487 - var cursors []cursorSnapshot 488 - 489 - s.lk.Lock() 490 - // Iterate over active subs and copy the current cursor 491 - for _, sub := range s.active { 492 + // gather cursors: lock overall set, then lock each individual subscription while gathering 493 + s.subsLk.Lock() 494 + cursors := make([]HostCursor, len(s.subs)) 495 + i := 0 496 + for _, sub := range s.subs { 492 497 sub.lk.RLock() 493 - cursors = append(cursors, cursorSnapshot{ 494 - hostID: sub.HostID, 495 - cursor: sub.LastSeq, 496 - }) 498 + cursors[i] = HostCursor{ 499 + HostID: sub.HostID, 500 + LastSeq: sub.LastSeq, 501 + } 497 502 sub.lk.RUnlock() 503 + i++ 498 504 } 499 - s.lk.Unlock() 500 - 501 - errs := []error{} 502 - okcount := 0 505 + s.subsLk.Unlock() 503 506 504 - tx := s.db.WithContext(ctx).Begin() 505 - for _, cursor := range cursors { 506 - if err := tx.WithContext(ctx).Model(models.Host{}).Where("id = ?", cursor.hostID).UpdateColumn("last_seq", cursor.cursor).Error; err != nil { 507 - errs = append(errs, err) 508 - } else { 509 - okcount++ 510 - } 511 - } 512 - if err := tx.WithContext(ctx).Commit().Error; err != nil { 513 - errs = append(errs, err) 514 - } 515 - dt := time.Since(start) 516 - s.logger.Info("flushCursors", "dt", dt, "ok", okcount, "errs", len(errs)) 517 - 518 - return errs 507 + err := s.Config.PersistCursorCallback(ctx, &cursors) 508 + s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start), "err", err) 509 + return err 519 510 } 520 511 512 + // TODO: called from admin endpoint 521 513 func (s *Slurper) GetActiveList() []string { 522 - s.lk.Lock() 523 - defer s.lk.Unlock() 514 + s.subsLk.Lock() 515 + defer s.subsLk.Unlock() 524 516 var out []string 525 - for k := range s.active { 517 + for k := range s.subs { 526 518 out = append(out, k) 527 519 } 528 520 ··· 530 522 } 531 523 532 524 func (s *Slurper) KillUpstreamConnection(hostname string, ban bool) error { 533 - s.lk.Lock() 534 - defer s.lk.Unlock() 525 + s.subsLk.Lock() 526 + defer s.subsLk.Unlock() 535 527 536 - ac, ok := s.active[hostname] 528 + sub, ok := s.subs[hostname] 537 529 if !ok { 538 530 return fmt.Errorf("killing connection %q: %w", hostname, ErrNoActiveConnection) 539 531 } 540 - ac.cancel() 532 + sub.cancel() 541 533 // cleanup in the run thread subscribeWithRedialer() will delete(s.active, host) 542 534 543 - if ban { 544 - if err := s.db.Model(models.Host{}).Where("id = ?", ac.HostID).UpdateColumn("status", "banned").Error; err != nil { 535 + if ban && s.Config.PersistHostStatusCallback != nil { 536 + if err := s.Config.PersistHostStatusCallback(context.TODO(), sub.HostID, models.HostStatusBanned); err != nil { 545 537 return fmt.Errorf("failed to set host as banned: %w", err) 546 538 } 547 539 }
+5 -1
cmd/rerelay/service.go
··· 183 183 // method to re-use that listener. 184 184 e.Listener = listen 185 185 srv := &http.Server{} 186 + // TODO: attach echo to Service, for shutdown? 186 187 return e.StartServer(srv) 187 188 } 188 189 189 190 func (svc *Service) Shutdown() []error { 190 - errs := svc.relay.Slurper.Shutdown() 191 + var errs []error 192 + if err := svc.relay.Slurper.Shutdown(); err != nil { 193 + errs = append(errs, err) 194 + } 191 195 192 196 if err := svc.relay.Events.Shutdown(context.TODO()); err != nil { 193 197 errs = append(errs, err)