this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

full network mode

dholms 8546ef7e f4462440

+162 -11
+87
nexus/crawler.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + 8 + comatproto "github.com/bluesky-social/indigo/api/atproto" 9 + "github.com/bluesky-social/indigo/nexus/models" 10 + "github.com/bluesky-social/indigo/xrpc" 11 + "gorm.io/gorm" 12 + ) 13 + 14 + func (n *Nexus) EnumerateNetwork(ctx context.Context) error { 15 + cursor, err := n.getListReposCursor(ctx) 16 + if err != nil { 17 + return err 18 + } 19 + 20 + client := &xrpc.Client{ 21 + Client: &http.Client{}, 22 + Host: n.RelayHost, 23 + } 24 + 25 + for { 26 + select { 27 + case <-ctx.Done(): 28 + return ctx.Err() 29 + default: 30 + } 31 + 32 + repoList, err := comatproto.SyncListRepos(ctx, client, cursor, 1000) 33 + if err != nil { 34 + return fmt.Errorf("failed to list repos: %w", err) 35 + } 36 + 37 + repos := make([]models.Repo, 0) 38 + for _, repo := range repoList.Repos { 39 + if repo.Active != nil && *repo.Active == false { 40 + continue 41 + } 42 + repos = append(repos, models.Repo{ 43 + Did: repo.Did, 44 + State: models.RepoStatePending, 45 + Status: models.AccountStatusActive, 46 + }) 47 + } 48 + 49 + if len(repos) == 0 { 50 + break 51 + } 52 + 53 + if err := n.db.Save(&repos).Error; err != nil { 54 + n.logger.Error("failed to save repos batch", "error", err) 55 + return err 56 + } 57 + 58 + n.logger.Info("enumerated repos batch", "count", len(repos)) 59 + 60 + if repoList.Cursor == nil || *repoList.Cursor == "" { 61 + break 62 + } 63 + cursor = *repoList.Cursor 64 + 65 + if err := n.db.Save(&models.ListReposCursor{ 66 + Host: n.RelayHost, 67 + Cursor: cursor, 68 + }).Error; err != nil { 69 + n.logger.Error("failed to save lsit repos cursor", "error", err) 70 + } 71 + } 72 + 73 + n.logger.Info("network enumeration complete") 74 + return nil 75 + } 76 + 77 + func (n *Nexus) getListReposCursor(ctx context.Context) (string, error) { 78 + var dbCursor models.ListReposCursor 79 + err := n.db.Where("host = ?", n.RelayHost).First(&dbCursor).Error 80 + if err != nil { 81 + if err != gorm.ErrRecordNotFound { 82 + return "", fmt.Errorf("failed to read list repos cursor: %w", err) 83 + } 84 + return "", nil 85 + } 86 + return dbCursor.Cursor, nil 87 + }
+3 -2
nexus/main.go
··· 11 11 12 12 func main() { 13 13 nexus, err := NewNexus(NexusConfig{ 14 - DBPath: "./nexus.db", 15 - RelayHost: "wss://relay1.us-east.bsky.network", 14 + DBPath: "./nexus.db", 15 + RelayHost: "wss://relay1.us-east.bsky.network", 16 + FullNetworkMode: true, 16 17 }) 17 18 if err != nil { 18 19 log.Fatal(err)
+6 -1
nexus/models/models.go
··· 51 51 Cid string `gorm:"not null"` 52 52 } 53 53 54 - type Cursor struct { 54 + type FirehoseCursor struct { 55 55 Host string `gorm:"primaryKey"` 56 56 Cursor int64 `gorm:"not null"` 57 57 } 58 + 59 + type ListReposCursor struct { 60 + Host string `gorm:"primaryKey"` 61 + Cursor string `gorm:"not null"` 62 + }
+22 -6
nexus/nexus.go
··· 26 26 27 27 FirehoseConsumer *FirehoseConsumer 28 28 EventProcessor *EventProcessor 29 + 30 + FullNetworkMode bool 31 + RelayHost string 29 32 } 30 33 31 34 type NexusConfig struct { ··· 33 36 RelayHost string 34 37 FirehoseParallelism int 35 38 FirehoseCursorSaveInterval time.Duration 39 + FullNetworkMode bool 36 40 } 37 41 38 42 func NewNexus(config NexusConfig) (*Nexus, error) { ··· 43 47 return nil, err 44 48 } 45 49 46 - if err := db.AutoMigrate(&models.Repo{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.ResyncBuffer{}, &models.Cursor{}); err != nil { 50 + if err := db.AutoMigrate(&models.Repo{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.ResyncBuffer{}, &models.FirehoseCursor{}, &models.ListReposCursor{}); err != nil { 47 51 return nil, err 48 52 } 49 53 ··· 64 68 Dir: &cdir, 65 69 66 70 outbox: NewOutbox(db), 71 + 72 + FullNetworkMode: config.FullNetworkMode, 73 + RelayHost: config.RelayHost, 67 74 } 68 75 69 76 parallelism := config.FirehoseParallelism ··· 77 84 } 78 85 79 86 n.EventProcessor = &EventProcessor{ 80 - Logger: n.logger.With("component", "processor"), 81 - DB: db, 82 - Dir: n.Dir, 83 - RelayHost: config.RelayHost, 84 - Outbox: n.outbox, 87 + Logger: n.logger.With("component", "processor"), 88 + DB: db, 89 + Dir: n.Dir, 90 + RelayHost: config.RelayHost, 91 + Outbox: n.outbox, 92 + FullNetworkMode: config.FullNetworkMode, 85 93 } 86 94 87 95 rsc := &events.RepoStreamCallbacks{ ··· 110 118 // crash recovery: reset any partially repos 111 119 if err := n.resetPartiallyResynced(); err != nil { 112 120 return nil, err 121 + } 122 + 123 + if config.FullNetworkMode { 124 + go func() { 125 + if err := n.EnumerateNetwork(context.Background()); err != nil { 126 + n.logger.Error("network enumeration failed", "error", err) 127 + } 128 + }() 113 129 } 114 130 115 131 for i := 0; i < 20; i++ {
+44 -2
nexus/processor.go
··· 24 24 RelayHost string 25 25 Outbox *Outbox 26 26 27 + FullNetworkMode bool 28 + 27 29 lastSeq atomic.Int64 28 30 } 29 31 ··· 34 36 if err != nil { 35 37 return err 36 38 } else if curr == nil { 39 + if ep.FullNetworkMode { 40 + if err := ep.EnsureRepo(evt.Repo); err != nil { 41 + ep.Logger.Error("failed to auto-track repo", "did", evt.Repo, "error", err) 42 + return err 43 + } 44 + ep.Logger.Info("auto-tracked new repo from firehose", "did", evt.Repo) 45 + return nil 46 + } 37 47 return nil 38 48 } 39 49 ··· 161 171 if err != nil { 162 172 return err 163 173 } else if curr == nil { 174 + if ep.FullNetworkMode { 175 + if err := ep.EnsureRepo(evt.Did); err != nil { 176 + ep.Logger.Error("failed to auto-track repo", "did", evt.Did, "error", err) 177 + return err 178 + } 179 + ep.Logger.Info("auto-tracked new repo from firehose", "did", evt.Did) 180 + return nil 181 + } 164 182 return nil 165 183 } 166 184 ··· 201 219 if err != nil { 202 220 return err 203 221 } else if curr == nil { 222 + if ep.FullNetworkMode { 223 + if err := ep.EnsureRepo(did); err != nil { 224 + ep.Logger.Error("failed to auto-track repo", "did", did, "error", err) 225 + return err 226 + } 227 + ep.Logger.Info("auto-tracked new repo from firehose", "did", did) 228 + return nil 229 + } 204 230 return nil 205 231 } 206 232 ··· 249 275 if err != nil { 250 276 return err 251 277 } else if curr == nil { 278 + if ep.FullNetworkMode && evt.Active { 279 + if err := ep.EnsureRepo(evt.Did); err != nil { 280 + ep.Logger.Error("failed to auto-track repo", "did", evt.Did, "error", err) 281 + return err 282 + } 283 + ep.Logger.Info("auto-tracked new repo from firehose", "did", evt.Did) 284 + return nil 285 + } 252 286 return nil 253 287 } 254 288 ··· 436 470 return nil 437 471 } 438 472 439 - return ep.DB.Save(&models.Cursor{ 473 + return ep.DB.Save(&models.FirehoseCursor{ 440 474 Host: ep.RelayHost, 441 475 Cursor: seq, 442 476 }).Error ··· 462 496 } 463 497 464 498 func (ep *EventProcessor) ReadLastCursor(ctx context.Context, relayHost string) (int64, error) { 465 - var cursor models.Cursor 499 + var cursor models.FirehoseCursor 466 500 if err := ep.DB.Where("host = ?", relayHost).First(&cursor).Error; err != nil { 467 501 if err == gorm.ErrRecordNotFound { 468 502 ep.Logger.Info("no pre-existing cursor in database", "relayHost", relayHost) ··· 489 523 Where("did = ?", did). 490 524 Update("state", state).Error 491 525 } 526 + 527 + func (ep *EventProcessor) EnsureRepo(did string) error { 528 + return ep.DB.Save(&models.Repo{ 529 + Did: did, 530 + State: models.RepoStatePending, 531 + Status: models.AccountStatusActive, 532 + }).Error 533 + }