this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

bootstrap by signalling collection

dholms 8bd90231 4407d314

+89 -3
+73
nexus/crawler.go
··· 85 85 } 86 86 return dbCursor.Cursor, nil 87 87 } 88 + 89 + func (n *Nexus) EnumerateNetworkByCollection(ctx context.Context, collection string) error { 90 + cursor, err := n.getCollectionCursor(ctx, collection) 91 + if err != nil { 92 + return err 93 + } 94 + 95 + client := &xrpc.Client{ 96 + Client: &http.Client{}, 97 + Host: n.RelayHost, 98 + } 99 + 100 + for { 101 + select { 102 + case <-ctx.Done(): 103 + return ctx.Err() 104 + default: 105 + } 106 + 107 + repoList, err := comatproto.SyncListReposByCollection(ctx, client, collection, cursor, 1000) 108 + if err != nil { 109 + return fmt.Errorf("failed to list repos by collection: %w", err) 110 + } 111 + 112 + repos := make([]models.Repo, 0) 113 + for _, repo := range repoList.Repos { 114 + repos = append(repos, models.Repo{ 115 + Did: repo.Did, 116 + State: models.RepoStatePending, 117 + Status: models.AccountStatusActive, 118 + }) 119 + } 120 + 121 + if len(repos) == 0 { 122 + break 123 + } 124 + 125 + if err := n.db.Save(&repos).Error; err != nil { 126 + n.logger.Error("failed to save repos batch", "error", err) 127 + return err 128 + } 129 + 130 + n.logger.Info("enumerated repos by collection batch", "collection", collection, "count", len(repos)) 131 + 132 + if repoList.Cursor == nil || *repoList.Cursor == "" { 133 + break 134 + } 135 + cursor = *repoList.Cursor 136 + 137 + if err := n.db.Save(&models.CollectionCursor{ 138 + Host: n.RelayHost, 139 + Collection: collection, 140 + Cursor: cursor, 141 + }).Error; err != nil { 142 + n.logger.Error("failed to save collection cursor", "error", err) 143 + } 144 + } 145 + 146 + n.logger.Info("collection enumeration complete", "collection", collection) 147 + return nil 148 + } 149 + 150 + func (n *Nexus) getCollectionCursor(ctx context.Context, collection string) (string, error) { 151 + var dbCursor models.CollectionCursor 152 + err := n.db.Where("host = ? AND collection = ?", n.RelayHost, collection).First(&dbCursor).Error 153 + if err != nil { 154 + if err != gorm.ErrRecordNotFound { 155 + return "", fmt.Errorf("failed to read collection cursor: %w", err) 156 + } 157 + return "", nil 158 + } 159 + return dbCursor.Cursor, nil 160 + }
+6
nexus/models/models.go
··· 63 63 Host string `gorm:"primaryKey"` 64 64 Cursor string `gorm:"not null"` 65 65 } 66 + 67 + type CollectionCursor struct { 68 + Host string `gorm:"primaryKey"` 69 + Collection string `gorm:"primaryKey"` 70 + Cursor string `gorm:"not null"` 71 + }
+10 -3
nexus/nexus.go
··· 44 44 ResyncParallelism int 45 45 FirehoseCursorSaveInterval time.Duration 46 46 FullNetworkMode bool 47 + SignalCollection string 47 48 DisableAcks bool 48 49 WebhookURL string 49 50 CollectionFilters []string // e.g., ["app.bsky.feed.post", "app.bsky.graph.*"] 50 51 } 51 52 52 53 func NewNexus(config NexusConfig) (*Nexus, error) { 53 - db, err := gorm.Open(sqlite.Open(config.DBPath+"?_journal_mode=WAL&_busy_timeout=10000"), &gorm.Config{ 54 + db, err := gorm.Open(sqlite.Open(config.DBPath+"?_journal_mode=WAL&_busy_timeout=10000&_synchronous=NORMAL&_temp_store=MEMORY&_cache_size=-64000&_wal_autocheckpoint=3000"), &gorm.Config{ 54 55 Logger: logger.Default.LogMode(logger.Silent), 55 56 }) 56 57 if err != nil { 57 58 return nil, err 58 59 } 59 60 60 - if err := db.AutoMigrate(&models.Repo{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.ResyncBuffer{}, &models.FirehoseCursor{}, &models.ListReposCursor{}); err != nil { 61 + if err := db.AutoMigrate(&models.Repo{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.ResyncBuffer{}, &models.FirehoseCursor{}, &models.ListReposCursor{}, &models.CollectionCursor{}); err != nil { 61 62 return nil, err 62 63 } 63 64 ··· 143 144 return nil, err 144 145 } 145 146 146 - if config.FullNetworkMode { 147 + if config.SignalCollection != "" { 148 + go func() { 149 + if err := n.EnumerateNetworkByCollection(context.Background(), config.SignalCollection); err != nil { 150 + n.logger.Error("collection enumeration failed", "error", err, "collection", config.SignalCollection) 151 + } 152 + }() 153 + } else if config.FullNetworkMode { 147 154 go func() { 148 155 if err := n.EnumerateNetwork(context.Background()); err != nil { 149 156 n.logger.Error("network enumeration failed", "error", err)