this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

relay host -> url

dholms 45a6f90e 7b2e3f3a

+46 -46
+9 -9
cmd/nexus/crawler.go
··· 14 14 ) 15 15 16 16 type Crawler struct { 17 - DB *gorm.DB 18 - Logger *slog.Logger 19 - RelayHost string 17 + DB *gorm.DB 18 + Logger *slog.Logger 19 + RelayUrl string 20 20 } 21 21 22 22 // EnumerateNetwork discovers and tracks all repositories on the network. ··· 30 30 Client: &http.Client{ 31 31 Timeout: 30 * time.Second, 32 32 }, 33 - Host: c.RelayHost, 33 + Host: c.RelayUrl, 34 34 } 35 35 36 36 for { ··· 74 74 cursor = *repoList.Cursor 75 75 76 76 if err := c.DB.Save(&models.ListReposCursor{ 77 - Host: c.RelayHost, 77 + Url: c.RelayUrl, 78 78 Cursor: cursor, 79 79 }).Error; err != nil { 80 80 c.Logger.Error("failed to save list repos cursor", "error", err) ··· 87 87 88 88 func (c *Crawler) getListReposCursor(ctx context.Context) (string, error) { 89 89 var dbCursor models.ListReposCursor 90 - err := c.DB.Where("host = ?", c.RelayHost).First(&dbCursor).Error 90 + err := c.DB.Where("url = ?", c.RelayUrl).First(&dbCursor).Error 91 91 if err != nil { 92 92 if err != gorm.ErrRecordNotFound { 93 93 return "", fmt.Errorf("failed to read list repos cursor: %w", err) ··· 108 108 Client: &http.Client{ 109 109 Timeout: 30 * time.Second, 110 110 }, 111 - Host: c.RelayHost, 111 + Host: c.RelayUrl, 112 112 } 113 113 114 114 for { ··· 149 149 cursor = *repoList.Cursor 150 150 151 151 if err := c.DB.Save(&models.CollectionCursor{ 152 - Host: c.RelayHost, 152 + Url: c.RelayUrl, 153 153 Collection: collection, 154 154 Cursor: cursor, 155 155 }).Error; err != nil { ··· 163 163 164 164 func (c *Crawler) getCollectionCursor(ctx context.Context, collection string) (string, error) { 165 165 var dbCursor models.CollectionCursor 166 - err := c.DB.Where("host = ? AND collection = ?", c.RelayHost, collection).First(&dbCursor).Error 166 + err := c.DB.Where("url = ? AND collection = ?", c.RelayUrl, collection).First(&dbCursor).Error 167 167 if err != nil { 168 168 if err != gorm.ErrRecordNotFound { 169 169 return "", fmt.Errorf("failed to read collection cursor: %w", err)
+6 -6
cmd/nexus/firehose.go
··· 14 14 ) 15 15 16 16 type FirehoseConsumer struct { 17 - RelayHost string 17 + RelayUrl string 18 18 Logger *slog.Logger 19 19 Parallelism int 20 20 Callbacks *events.RepoStreamCallbacks 21 - GetCursor func(ctx context.Context, relayHost string) (int64, error) 21 + GetCursor func(ctx context.Context, relayUrl string) (int64, error) 22 22 } 23 23 24 24 // Run connects to the firehose and processes events until context cancellation or error. 25 25 func (f *FirehoseConsumer) Run(ctx context.Context) error { 26 26 27 - u, err := url.Parse(f.RelayHost) 27 + u, err := url.Parse(f.RelayUrl) 28 28 if err != nil { 29 - return fmt.Errorf("invalid relayHost URI: %w", err) 29 + return fmt.Errorf("invalid relay URL: %w", err) 30 30 } 31 31 switch u.Scheme { 32 32 case "http": ··· 44 44 default: 45 45 } 46 46 47 - cursor, err := f.GetCursor(ctx, f.RelayHost) 47 + cursor, err := f.GetCursor(ctx, f.RelayUrl) 48 48 if err != nil { 49 49 return fmt.Errorf("failed to read cursor: %w", err) 50 50 } ··· 71 71 scheduler := parallel.NewScheduler( 72 72 f.Parallelism, 73 73 100, 74 - f.RelayHost, 74 + f.RelayUrl, 75 75 f.Callbacks.EventHandler, 76 76 ) 77 77 if err := events.HandleRepoStream(ctx, con, scheduler, nil); err != nil {
+5 -5
cmd/nexus/main.go
··· 36 36 EnvVars: []string{"NEXUS_DB_PATH"}, 37 37 }, 38 38 &cli.StringFlag{ 39 - Name: "relay-host", 40 - Usage: "AT Protocol relay host URL", 41 - Value: "https://bsky.network", 42 - EnvVars: []string{"NEXUS_RELAY_HOST"}, 39 + Name: "relay-url", 40 + Usage: "AT Protocol relay URL", 41 + Value: "https://relay1.us-east.bsky.network", 42 + EnvVars: []string{"NEXUS_RELAY_URL"}, 43 43 }, 44 44 &cli.StringFlag{ 45 45 Name: "bind", ··· 113 113 114 114 config := NexusConfig{ 115 115 DBPath: cctx.String("db-path"), 116 - RelayHost: cctx.String("relay-host"), 116 + RelayUrl: cctx.String("relay-url"), 117 117 FirehoseParallelism: cctx.Int("firehose-parallelism"), 118 118 ResyncParallelism: cctx.Int("resync-parallelism"), 119 119 FirehoseCursorSaveInterval: cctx.Duration("cursor-save-interval"),
+3 -3
cmd/nexus/models/models.go
··· 55 55 } 56 56 57 57 type FirehoseCursor struct { 58 - Host string `gorm:"primaryKey"` 58 + Url string `gorm:"primaryKey"` 59 59 Cursor int64 `gorm:"not null"` 60 60 } 61 61 62 62 type ListReposCursor struct { 63 - Host string `gorm:"primaryKey"` 63 + Url string `gorm:"primaryKey"` 64 64 Cursor string `gorm:"not null"` 65 65 } 66 66 67 67 type CollectionCursor struct { 68 - Host string `gorm:"primaryKey"` 68 + Url string `gorm:"primaryKey"` 69 69 Collection string `gorm:"primaryKey"` 70 70 Cursor string `gorm:"not null"` 71 71 }
+10 -10
cmd/nexus/nexus.go
··· 19 19 db *gorm.DB 20 20 logger *slog.Logger 21 21 22 - Dir identity.Directory 23 - RelayHost string 22 + Dir identity.Directory 23 + RelayUrl string 24 24 25 25 Server *NexusServer 26 26 Outbox *Outbox ··· 41 41 42 42 type NexusConfig struct { 43 43 DBPath string 44 - RelayHost string 44 + RelayUrl string 45 45 FirehoseParallelism int 46 46 ResyncParallelism int 47 47 FirehoseCursorSaveInterval time.Duration ··· 83 83 db: db, 84 84 logger: slog.Default().With("system", "nexus"), 85 85 86 - Dir: &cdir, 87 - RelayHost: config.RelayHost, 86 + Dir: &cdir, 87 + RelayUrl: config.RelayUrl, 88 88 89 89 Outbox: NewOutbox(db, outboxMode, config.WebhookURL), 90 90 ··· 116 116 Logger: n.logger.With("component", "processor"), 117 117 DB: db, 118 118 Dir: n.Dir, 119 - RelayHost: config.RelayHost, 119 + RelayUrl: config.RelayUrl, 120 120 Outbox: n.Outbox, 121 121 FullNetworkMode: config.FullNetworkMode, 122 122 CollectionFilters: config.CollectionFilters, ··· 138 138 } 139 139 140 140 n.FirehoseConsumer = &FirehoseConsumer{ 141 - RelayHost: config.RelayHost, 141 + RelayUrl: config.RelayUrl, 142 142 Logger: n.logger.With("component", "firehose"), 143 143 Parallelism: firehoseParallelism, 144 144 Callbacks: rsc, ··· 146 146 } 147 147 148 148 n.Crawler = &Crawler{ 149 - Logger: n.logger.With("component", "crawler"), 150 - DB: db, 151 - RelayHost: config.RelayHost, 149 + Logger: n.logger.With("component", "crawler"), 150 + DB: db, 151 + RelayUrl: config.RelayUrl, 152 152 } 153 153 154 154 if err := n.resetPartiallyResynced(); err != nil {
+11 -11
cmd/nexus/processor.go
··· 19 19 ) 20 20 21 21 type EventProcessor struct { 22 - Logger *slog.Logger 23 - DB *gorm.DB 24 - Dir identity.Directory 25 - RelayHost string 26 - Outbox *Outbox 22 + Logger *slog.Logger 23 + DB *gorm.DB 24 + Dir identity.Directory 25 + RelayUrl string 26 + Outbox *Outbox 27 27 28 28 FullNetworkMode bool 29 29 CollectionFilters []string ··· 477 477 } 478 478 479 479 return ep.DB.Save(&models.FirehoseCursor{ 480 - Host: ep.RelayHost, 480 + Url: ep.RelayUrl, 481 481 Cursor: seq, 482 482 }).Error 483 483 } ··· 491 491 select { 492 492 case <-ctx.Done(): 493 493 if err := ep.saveCursor(ctx); err != nil { 494 - ep.Logger.Error("failed to save cursor on shutdown", "error", err, "relayHost", ep.RelayHost) 494 + ep.Logger.Error("failed to save cursor on shutdown", "error", err, "relayUrl", ep.RelayUrl) 495 495 } 496 496 return 497 497 case <-ticker.C: 498 498 if err := ep.saveCursor(ctx); err != nil { 499 - ep.Logger.Error("failed to save cursor", "error", err, "relayHost", ep.RelayHost) 499 + ep.Logger.Error("failed to save cursor", "error", err, "relayUrl", ep.RelayUrl) 500 500 } 501 501 } 502 502 } 503 503 } 504 504 505 - func (ep *EventProcessor) ReadLastCursor(ctx context.Context, relayHost string) (int64, error) { 505 + func (ep *EventProcessor) ReadLastCursor(ctx context.Context, relayUrl string) (int64, error) { 506 506 var cursor models.FirehoseCursor 507 - if err := ep.DB.Where("host = ?", relayHost).First(&cursor).Error; err != nil { 507 + if err := ep.DB.Where("url = ?", relayUrl).First(&cursor).Error; err != nil { 508 508 if err == gorm.ErrRecordNotFound { 509 - ep.Logger.Info("no pre-existing cursor in database", "relayHost", relayHost) 509 + ep.Logger.Info("no pre-existing cursor in database", "relayUrl", relayUrl) 510 510 return 0, nil 511 511 } 512 512 return 0, err
+2 -2
cmd/nexus/server.go
··· 22 22 ns.echo = echo.New() 23 23 ns.echo.HideBanner = true 24 24 ns.echo.GET("/health", ns.handleHealthcheck) 25 - ns.echo.GET("/listen", ns.handleListen) 25 + ns.echo.GET("/channel", ns.handleChannelWebsocket) 26 26 ns.echo.POST("/add-repos", ns.handleAddRepos) 27 27 ns.echo.POST("/remove-repos", ns.handleRemoveRepos) 28 28 return ns.echo.Start(address) ··· 45 45 }, 46 46 } 47 47 48 - func (ns *NexusServer) handleListen(c echo.Context) error { 48 + func (ns *NexusServer) handleChannelWebsocket(c echo.Context) error { 49 49 if ns.Outbox.mode == OutboxModeWebhook { 50 50 return echo.NewHTTPError(http.StatusBadRequest, "websocket not available in webhook mode") 51 51 }