this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

move to cli structure & refactor lifecycles

dholms 6eeeb023 005ff1fd

+274 -99
+31 -24
cmd/nexus/crawler.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "log/slog" 6 7 "net/http" 7 8 8 9 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 11 12 "gorm.io/gorm" 12 13 ) 13 14 14 - func (n *Nexus) EnumerateNetwork(ctx context.Context) error { 15 - cursor, err := n.getListReposCursor(ctx) 15 + type Crawler struct { 16 + DB *gorm.DB 17 + Logger *slog.Logger 18 + RelayHost string 19 + } 20 + 21 + func (c *Crawler) EnumerateNetwork(ctx context.Context) error { 22 + cursor, err := c.getListReposCursor(ctx) 16 23 if err != nil { 17 24 return err 18 25 } 19 26 20 27 client := &xrpc.Client{ 21 28 Client: &http.Client{}, 22 - Host: n.RelayHost, 29 + Host: c.RelayHost, 23 30 } 24 31 25 32 for { ··· 50 57 break 51 58 } 52 59 53 - if err := n.db.Save(&repos).Error; err != nil { 54 - n.logger.Error("failed to save repos batch", "error", err) 60 + if err := c.DB.Save(&repos).Error; err != nil { 61 + c.Logger.Error("failed to save repos batch", "error", err) 55 62 return err 56 63 } 57 64 58 - n.logger.Info("enumerated repos batch", "count", len(repos)) 65 + c.Logger.Info("enumerated repos batch", "count", len(repos)) 59 66 60 67 if repoList.Cursor == nil || *repoList.Cursor == "" { 61 68 break 62 69 } 63 70 cursor = *repoList.Cursor 64 71 65 - if err := n.db.Save(&models.ListReposCursor{ 66 - Host: n.RelayHost, 72 + if err := c.DB.Save(&models.ListReposCursor{ 73 + Host: c.RelayHost, 67 74 Cursor: cursor, 68 75 }).Error; err != nil { 69 - n.logger.Error("failed to save lsit repos cursor", "error", err) 76 + c.Logger.Error("failed to save lsit repos cursor", "error", err) 70 77 } 71 78 } 72 79 73 - n.logger.Info("network enumeration complete") 80 + c.Logger.Info("network enumeration complete") 74 81 return nil 75 82 } 76 83 77 - func (n *Nexus) getListReposCursor(ctx context.Context) (string, error) { 84 + func (c *Crawler) getListReposCursor(ctx context.Context) (string, error) { 78 85 var dbCursor models.ListReposCursor 79 - err := n.db.Where("host = ?", n.RelayHost).First(&dbCursor).Error 86 + err := c.DB.Where("host = ?", c.RelayHost).First(&dbCursor).Error 80 87 if err != nil { 81 88 if err != gorm.ErrRecordNotFound { 82 89 return "", fmt.Errorf("failed to read list repos cursor: %w", err) ··· 86 93 return dbCursor.Cursor, nil 87 94 } 88 95 89 - func (n *Nexus) EnumerateNetworkByCollection(ctx context.Context, collection string) error { 90 - cursor, err := n.getCollectionCursor(ctx, collection) 96 + func (c *Crawler) EnumerateNetworkByCollection(ctx context.Context, collection string) error { 97 + cursor, err := c.getCollectionCursor(ctx, collection) 91 98 if err != nil { 92 99 return err 93 100 } 94 101 95 102 client := &xrpc.Client{ 96 103 Client: &http.Client{}, 97 - Host: n.RelayHost, 104 + Host: c.RelayHost, 98 105 } 99 106 100 107 for { ··· 122 129 break 123 130 } 124 131 125 - if err := n.db.Save(&repos).Error; err != nil { 126 - n.logger.Error("failed to save repos batch", "error", err) 132 + if err := c.DB.Save(&repos).Error; err != nil { 133 + c.Logger.Error("failed to save repos batch", "error", err) 127 134 return err 128 135 } 129 136 130 - n.logger.Info("enumerated repos by collection batch", "collection", collection, "count", len(repos)) 137 + c.Logger.Info("enumerated repos by collection batch", "collection", collection, "count", len(repos)) 131 138 132 139 if repoList.Cursor == nil || *repoList.Cursor == "" { 133 140 break 134 141 } 135 142 cursor = *repoList.Cursor 136 143 137 - if err := n.db.Save(&models.CollectionCursor{ 138 - Host: n.RelayHost, 144 + if err := c.DB.Save(&models.CollectionCursor{ 145 + Host: c.RelayHost, 139 146 Collection: collection, 140 147 Cursor: cursor, 141 148 }).Error; err != nil { 142 - n.logger.Error("failed to save collection cursor", "error", err) 149 + c.Logger.Error("failed to save collection cursor", "error", err) 143 150 } 144 151 } 145 152 146 - n.logger.Info("collection enumeration complete", "collection", collection) 153 + c.Logger.Info("collection enumeration complete", "collection", collection) 147 154 return nil 148 155 } 149 156 150 - func (n *Nexus) getCollectionCursor(ctx context.Context, collection string) (string, error) { 157 + func (c *Crawler) getCollectionCursor(ctx context.Context, collection string) (string, error) { 151 158 var dbCursor models.CollectionCursor 152 - err := n.db.Where("host = ? AND collection = ?", n.RelayHost, collection).First(&dbCursor).Error 159 + err := c.DB.Where("host = ? AND collection = ?", c.RelayHost, collection).First(&dbCursor).Error 153 160 if err != nil { 154 161 if err != gorm.ErrRecordNotFound { 155 162 return "", fmt.Errorf("failed to read collection cursor: %w", err)
+33 -16
cmd/nexus/handlers.go cmd/nexus/server.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 5 + "log/slog" 4 6 "net/http" 5 7 6 8 "github.com/bluesky-social/indigo/cmd/nexus/models" 7 9 "github.com/gorilla/websocket" 8 10 "github.com/labstack/echo/v4" 11 + "gorm.io/gorm" 9 12 ) 10 13 11 - var upgrader = websocket.Upgrader{ 12 - CheckOrigin: func(r *http.Request) bool { 13 - return true 14 - }, 14 + type NexusServer struct { 15 + db *gorm.DB 16 + echo *echo.Echo 17 + logger *slog.Logger 18 + Outbox *Outbox 15 19 } 16 20 17 - func (n *Nexus) registerRoutes() { 21 + func (n *NexusServer) Start(address string) error { 22 + n.echo = echo.New() 23 + n.echo.HideBanner = true 18 24 n.echo.GET("/health", n.handleHealthcheck) 19 25 n.echo.GET("/listen", n.handleListen) 20 26 n.echo.POST("/add-repos", n.handleAddRepos) 21 27 n.echo.POST("/remove-repos", n.handleAddRepos) 28 + return n.echo.Start(address) 22 29 } 23 30 24 - func (n *Nexus) handleHealthcheck(c echo.Context) error { 31 + func (n *NexusServer) Shutdown(ctx context.Context) error { 32 + return n.echo.Shutdown(ctx) 33 + } 34 + 35 + func (n *NexusServer) handleHealthcheck(c echo.Context) error { 25 36 return c.JSON(http.StatusOK, map[string]string{ 26 37 "status": "ok", 27 38 }) 28 39 } 29 40 30 - func (n *Nexus) handleListen(c echo.Context) error { 31 - if n.outbox.mode == OutboxModeWebhook { 41 + var wsUpgrader = websocket.Upgrader{ 42 + CheckOrigin: func(r *http.Request) bool { 43 + return true 44 + }, 45 + } 46 + 47 + func (n *NexusServer) handleListen(c echo.Context) error { 48 + if n.Outbox.mode == OutboxModeWebhook { 32 49 return echo.NewHTTPError(http.StatusBadRequest, "websocket not available in webhook mode") 33 50 } 34 51 35 - ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 52 + ws, err := wsUpgrader.Upgrade(c.Response(), c.Request(), nil) 36 53 if err != nil { 37 54 return err 38 55 } ··· 52 69 } 53 70 54 71 // Process acks directly in websocket-ack mode 55 - if n.outbox.mode == OutboxModeWebsocketAck { 56 - n.outbox.AckEvent(msg.ID) 72 + if n.Outbox.mode == OutboxModeWebsocketAck { 73 + n.Outbox.AckEvent(msg.ID) 57 74 } 58 75 } 59 76 }() ··· 63 80 case <-disconnected: 64 81 n.logger.Info("websocket disconnected") 65 82 return nil 66 - case evt, ok := <-n.outbox.events: 83 + case evt, ok := <-n.Outbox.events: 67 84 if !ok { 68 85 return nil 69 86 } ··· 73 90 } 74 91 // In fire-and-forget mode, ack immediately after write succeeds 75 92 // In websocket-ack mode, wait for client to send ack and handle in read loop 76 - if n.outbox.mode == OutboxModeFireAndForget { 77 - n.outbox.AckEvent(evt.ID) 93 + if n.Outbox.mode == OutboxModeFireAndForget { 94 + n.Outbox.AckEvent(evt.ID) 78 95 } 79 96 } 80 97 } ··· 84 101 DIDs []string `json:"dids"` 85 102 } 86 103 87 - func (n *Nexus) handleAddRepos(c echo.Context) error { 104 + func (n *NexusServer) handleAddRepos(c echo.Context) error { 88 105 var payload DidPayload 89 106 if err := c.Bind(&payload); err != nil { 90 107 return err ··· 110 127 }) 111 128 } 112 129 113 - func (n *Nexus) handleRemoveRepos(c echo.Context) error { 130 + func (n *NexusServer) handleRemoveRepos(c echo.Context) error { 114 131 var payload DidPayload 115 132 if err := c.Bind(&payload); err != nil { 116 133 return err
+163 -32
cmd/nexus/main.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "log" 5 + "log/slog" 6 6 "os" 7 7 "os/signal" 8 8 "syscall" 9 9 "time" 10 + 11 + _ "github.com/joho/godotenv/autoload" 12 + 13 + "github.com/carlmjohnson/versioninfo" 14 + "github.com/urfave/cli/v2" 10 15 ) 11 16 12 17 func main() { 13 - nexus, err := NewNexus(NexusConfig{ 14 - DBPath: "./nexus.db", 15 - RelayHost: "https://relay1.us-east.bsky.network", 16 - // SignalCollection: "xyz.statusphere.status", 17 - FullNetworkMode: false, 18 - DisableAcks: false, 19 - ResyncParallelism: 5, 20 - }) 18 + if err := run(os.Args); err != nil { 19 + slog.Error("exiting process", "err", err.Error()) 20 + os.Exit(-1) 21 + } 22 + } 23 + 24 + func run(args []string) error { 25 + app := cli.App{ 26 + Name: "nexus", 27 + Usage: "atproto sync service", 28 + Version: versioninfo.Short(), 29 + } 30 + 31 + app.Flags = []cli.Flag{ 32 + &cli.StringFlag{ 33 + Name: "db-path", 34 + Usage: "path to SQLite database file", 35 + Value: "./nexus.db", 36 + EnvVars: []string{"NEXUS_DB_PATH"}, 37 + }, 38 + &cli.StringFlag{ 39 + Name: "relay-host", 40 + Usage: "AT Protocol relay host URL", 41 + Value: "https://bsky.network", 42 + EnvVars: []string{"NEXUS_RELAY_HOST"}, 43 + }, 44 + &cli.StringFlag{ 45 + Name: "bind", 46 + Usage: "address and port to listen on for HTTP APIs", 47 + Value: ":8080", 48 + EnvVars: []string{"NEXUS_BIND"}, 49 + }, 50 + &cli.IntFlag{ 51 + Name: "firehose-parallelism", 52 + Usage: "number of parallel firehose event processors", 53 + Value: 10, 54 + EnvVars: []string{"NEXUS_FIREHOSE_PARALLELISM"}, 55 + }, 56 + &cli.IntFlag{ 57 + Name: "resync-parallelism", 58 + Usage: "number of parallel resync workers", 59 + Value: 5, 60 + EnvVars: []string{"NEXUS_RESYNC_PARALLELISM"}, 61 + }, 62 + &cli.DurationFlag{ 63 + Name: "cursor-save-interval", 64 + Usage: "how often to save firehose cursor", 65 + Value: 0, 66 + EnvVars: []string{"NEXUS_CURSOR_SAVE_INTERVAL"}, 67 + }, 68 + &cli.BoolFlag{ 69 + Name: "full-network-mode", 70 + Usage: "enumerate and sync all repos on the network", 71 + EnvVars: []string{"NEXUS_FULL_NETWORK_MODE"}, 72 + }, 73 + &cli.StringFlag{ 74 + Name: "signal-collection", 75 + Usage: "enumerate repos by collection (exact NSID)", 76 + EnvVars: []string{"NEXUS_SIGNAL_COLLECTION"}, 77 + }, 78 + &cli.BoolFlag{ 79 + Name: "disable-acks", 80 + Usage: "disable client acknowledgments (fire-and-forget mode)", 81 + EnvVars: []string{"NEXUS_DISABLE_ACKS"}, 82 + }, 83 + &cli.StringFlag{ 84 + Name: "webhook-url", 85 + Usage: "webhook URL for event delivery (instead of WebSocket)", 86 + EnvVars: []string{"NEXUS_WEBHOOK_URL"}, 87 + }, 88 + &cli.StringSliceFlag{ 89 + Name: "collection-filters", 90 + Usage: "filter output records by collection (supports wildcards)", 91 + EnvVars: []string{"NEXUS_COLLECTION_FILTERS"}, 92 + }, 93 + &cli.StringFlag{ 94 + Name: "log-level", 95 + Usage: "log verbosity level (debug, info, warn, error)", 96 + Value: "info", 97 + EnvVars: []string{"NEXUS_LOG_LEVEL", "LOG_LEVEL"}, 98 + }, 99 + } 100 + 101 + app.Action = runNexus 102 + 103 + return app.Run(args) 104 + } 105 + 106 + func runNexus(cctx *cli.Context) error { 107 + ctx := cctx.Context 108 + logger := configLogger(cctx, os.Stdout) 109 + slog.SetDefault(logger) 110 + 111 + signals := make(chan os.Signal, 1) 112 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 113 + 114 + config := NexusConfig{ 115 + DBPath: cctx.String("db-path"), 116 + RelayHost: cctx.String("relay-host"), 117 + FirehoseParallelism: cctx.Int("firehose-parallelism"), 118 + ResyncParallelism: cctx.Int("resync-parallelism"), 119 + FirehoseCursorSaveInterval: cctx.Duration("cursor-save-interval"), 120 + FullNetworkMode: cctx.Bool("full-network-mode"), 121 + SignalCollection: cctx.String("signal-collection"), 122 + DisableAcks: cctx.Bool("disable-acks"), 123 + WebhookURL: cctx.String("webhook-url"), 124 + CollectionFilters: cctx.StringSlice("collection-filters"), 125 + } 126 + 127 + logger.Info("creating nexus service", "config", config) 128 + nexus, err := NewNexus(config) 21 129 if err != nil { 22 - log.Fatal(err) 130 + return err 23 131 } 24 132 25 - fhCtx, fhCancel := context.WithCancel(context.Background()) 133 + logger.Info("starting nexus background workers") 134 + if err := nexus.Start(ctx); err != nil { 135 + return err 136 + } 137 + 138 + svcErr := make(chan error, 1) 26 139 go func() { 27 - err := nexus.FirehoseConsumer.Run(fhCtx) 28 - if err != nil { 29 - log.Printf("Firehose error: %v", err) 30 - } 140 + logger.Info("starting HTTP server", "addr", cctx.String("bind")) 141 + err := nexus.echo.Start(cctx.String("bind")) 142 + svcErr <- err 31 143 }() 32 144 33 - go func() { 34 - if err := nexus.Start(context.Background(), ":8080"); err != nil { 35 - log.Printf("Server error: %v", err) 145 + logger.Info("startup complete") 146 + select { 147 + case <-signals: 148 + logger.Info("received shutdown signal") 149 + case err := <-svcErr: 150 + if err != nil { 151 + logger.Error("server error", "err", err) 36 152 } 37 - }() 38 - 39 - // Wait for interrupt signal 40 - quit := make(chan os.Signal, 1) 41 - signal.Notify(quit, os.Interrupt, syscall.SIGTERM) 42 - <-quit 153 + } 43 154 44 - log.Println("Shutting down server...") 155 + logger.Info("shutting down") 156 + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 157 + defer cancel() 45 158 46 - fhCancel() 159 + if err := nexus.Shutdown(shutdownCtx); err != nil { 160 + logger.Error("error during shutdown", "err", err) 161 + return err 162 + } 47 163 48 - // Graceful shutdown with timeout 49 - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 50 - defer cancel() 164 + logger.Info("shutdown complete") 165 + return nil 166 + } 51 167 52 - if err := nexus.Shutdown(ctx); err != nil { 53 - log.Fatal(err) 168 + func configLogger(cctx *cli.Context, writer *os.File) *slog.Logger { 169 + var level slog.Level 170 + switch cctx.String("log-level") { 171 + case "debug": 172 + level = slog.LevelDebug 173 + case "info": 174 + level = slog.LevelInfo 175 + case "warn": 176 + level = slog.LevelWarn 177 + case "error": 178 + level = slog.LevelError 179 + default: 180 + level = slog.LevelInfo 54 181 } 55 182 56 - log.Println("Server stopped") 183 + logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{ 184 + Level: level, 185 + })) 186 + 187 + return logger 57 188 }
+47 -27
cmd/nexus/nexus.go
··· 8 8 9 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 10 10 "github.com/bluesky-social/indigo/atproto/identity" 11 - "github.com/bluesky-social/indigo/events" 12 11 "github.com/bluesky-social/indigo/cmd/nexus/models" 13 - "github.com/labstack/echo/v4" 12 + "github.com/bluesky-social/indigo/events" 14 13 "gorm.io/driver/sqlite" 15 14 "gorm.io/gorm" 16 15 "gorm.io/gorm/logger" ··· 18 17 19 18 type Nexus struct { 20 19 db *gorm.DB 21 - echo *echo.Echo 22 20 logger *slog.Logger 23 21 24 22 Dir identity.Directory 25 23 RelayHost string 26 24 27 - outbox *Outbox 25 + Server *NexusServer 26 + Outbox *Outbox 28 27 29 28 FirehoseConsumer *FirehoseConsumer 30 29 EventProcessor *EventProcessor 30 + Crawler *Crawler 31 31 32 32 FullNetworkMode bool 33 33 CollectionFilters []string 34 + 35 + config NexusConfig 34 36 35 37 claimJobMu sync.Mutex 36 38 pdsBackoff map[string]time.Time ··· 62 64 return nil, err 63 65 } 64 66 65 - e := echo.New() 66 - e.HideBanner = true 67 - 68 67 bdir := identity.BaseDirectory{ 69 68 TryAuthoritativeDNS: false, 70 69 SkipDNSDomainSuffixes: []string{".bsky.social"}, ··· 82 81 83 82 n := &Nexus{ 84 83 db: db, 85 - echo: e, 86 84 logger: slog.Default().With("system", "nexus"), 87 85 88 86 Dir: &cdir, 89 87 RelayHost: config.RelayHost, 90 88 91 - outbox: NewOutbox(db, outboxMode, config.WebhookURL), 89 + Outbox: NewOutbox(db, outboxMode, config.WebhookURL), 92 90 93 91 FullNetworkMode: config.FullNetworkMode, 94 92 CollectionFilters: config.CollectionFilters, 95 93 94 + config: config, 95 + 96 96 pdsBackoff: make(map[string]time.Time), 97 97 } 98 98 99 + n.Server = &NexusServer{ 100 + db: db, 101 + logger: n.logger.With("component", "server"), 102 + Outbox: n.Outbox, 103 + } 104 + 99 105 firehoseParallelism := config.FirehoseParallelism 100 106 if firehoseParallelism == 0 { 101 107 firehoseParallelism = 10 ··· 111 117 DB: db, 112 118 Dir: n.Dir, 113 119 RelayHost: config.RelayHost, 114 - Outbox: n.outbox, 120 + Outbox: n.Outbox, 115 121 FullNetworkMode: config.FullNetworkMode, 116 122 CollectionFilters: config.CollectionFilters, 117 123 } ··· 139 145 GetCursor: n.EventProcessor.ReadLastCursor, 140 146 } 141 147 142 - // crash recovery: reset any partially repos 148 + n.Crawler = &Crawler{ 149 + Logger: n.logger.With("component", "crawler"), 150 + DB: db, 151 + RelayHost: config.RelayHost, 152 + } 153 + 154 + return n, nil 155 + } 156 + 157 + func (n *Nexus) Start(ctx context.Context) error { 143 158 if err := n.resetPartiallyResynced(); err != nil { 144 - return nil, err 159 + return err 145 160 } 146 161 147 - if config.SignalCollection != "" { 162 + if n.config.SignalCollection != "" { 148 163 go func() { 149 - if err := n.EnumerateNetworkByCollection(context.Background(), config.SignalCollection); err != nil { 150 - n.logger.Error("collection enumeration failed", "error", err, "collection", config.SignalCollection) 164 + if err := n.Crawler.EnumerateNetworkByCollection(ctx, n.config.SignalCollection); err != nil { 165 + n.logger.Error("collection enumeration failed", "error", err, "collection", n.config.SignalCollection) 151 166 } 152 167 }() 153 - } else if config.FullNetworkMode { 168 + } else if n.config.FullNetworkMode { 154 169 go func() { 155 - if err := n.EnumerateNetwork(context.Background()); err != nil { 170 + if err := n.Crawler.EnumerateNetwork(ctx); err != nil { 156 171 n.logger.Error("network enumeration failed", "error", err) 157 172 } 158 173 }() 159 174 } 160 175 161 - resyncParallelism := config.ResyncParallelism 176 + resyncParallelism := n.config.ResyncParallelism 162 177 if resyncParallelism == 0 { 163 178 resyncParallelism = 5 164 179 } 165 180 for i := 0; i < resyncParallelism; i++ { 166 - go n.runResyncWorker(context.Background(), i) 181 + go n.runResyncWorker(ctx, i) 167 182 } 168 183 169 - go n.EventProcessor.RunCursorSaver(context.Background(), cursorSaveInterval) 170 - go n.outbox.Run(context.Background()) 184 + cursorSaveInterval := n.config.FirehoseCursorSaveInterval 185 + if cursorSaveInterval == 0 { 186 + cursorSaveInterval = 5 * time.Second 187 + } 188 + go n.EventProcessor.RunCursorSaver(ctx, cursorSaveInterval) 171 189 172 - n.registerRoutes() 190 + go n.Outbox.Run(ctx) 173 191 174 - return n, nil 175 - } 192 + return n.FirehoseConsumer.Run(ctx) 193 + go func() { 194 + if err := n.FirehoseConsumer.Run(ctx); err != nil { 195 + n.logger.Error("firehose error", "error", err) 196 + } 197 + }() 176 198 177 - func (n *Nexus) Start(ctx context.Context, addr string) error { 178 - n.logger.Info("starting nexus server", "addr", addr) 179 - return n.echo.Start(addr) 199 + return nil 180 200 } 181 201 182 202 func (n *Nexus) Shutdown(ctx context.Context) error {