this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

bunch of tidy

dholms 7b2e3f3a e77417be

+103 -73
+14 -7
cmd/nexus/crawler.go
··· 5 5 "fmt" 6 6 "log/slog" 7 7 "net/http" 8 + "time" 8 9 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 10 11 "github.com/bluesky-social/indigo/cmd/nexus/models" ··· 18 19 RelayHost string 19 20 } 20 21 22 + // EnumerateNetwork discovers and tracks all repositories on the network. 21 23 func (c *Crawler) EnumerateNetwork(ctx context.Context) error { 22 24 cursor, err := c.getListReposCursor(ctx) 23 25 if err != nil { ··· 25 27 } 26 28 27 29 client := &xrpc.Client{ 28 - Client: &http.Client{}, 29 - Host: c.RelayHost, 30 + Client: &http.Client{ 31 + Timeout: 30 * time.Second, 32 + }, 33 + Host: c.RelayHost, 30 34 } 31 35 32 36 for { ··· 58 62 } 59 63 60 64 if err := c.DB.Save(&repos).Error; err != nil { 61 - c.Logger.Error("failed to save repos batch", "error", err) 65 + c.Logger.Error("failed to save repos batch", "error", err, "count", len(repos)) 62 66 return err 63 67 } 64 68 ··· 73 77 Host: c.RelayHost, 74 78 Cursor: cursor, 75 79 }).Error; err != nil { 76 - c.Logger.Error("failed to save lsit repos cursor", "error", err) 80 + c.Logger.Error("failed to save list repos cursor", "error", err) 77 81 } 78 82 } 79 83 ··· 93 97 return dbCursor.Cursor, nil 94 98 } 95 99 100 + // EnumerateNetworkByCollection discovers repositories that have records in the specified collection. 96 101 func (c *Crawler) EnumerateNetworkByCollection(ctx context.Context, collection string) error { 97 102 cursor, err := c.getCollectionCursor(ctx, collection) 98 103 if err != nil { ··· 100 105 } 101 106 102 107 client := &xrpc.Client{ 103 - Client: &http.Client{}, 104 - Host: c.RelayHost, 108 + Client: &http.Client{ 109 + Timeout: 30 * time.Second, 110 + }, 111 + Host: c.RelayHost, 105 112 } 106 113 107 114 for { ··· 130 137 } 131 138 132 139 if err := c.DB.Save(&repos).Error; err != nil { 133 - c.Logger.Error("failed to save repos batch", "error", err) 140 + c.Logger.Error("failed to save repos batch", "error", err, "collection", collection, "count", len(repos)) 134 141 return err 135 142 } 136 143
+11 -10
cmd/nexus/firehose.go
··· 21 21 GetCursor func(ctx context.Context, relayHost string) (int64, error) 22 22 } 23 23 24 - func (fc *FirehoseConsumer) Run(ctx context.Context) error { 24 + // Run connects to the firehose and processes events until context cancellation or error. 25 + func (f *FirehoseConsumer) Run(ctx context.Context) error { 25 26 26 - u, err := url.Parse(fc.RelayHost) 27 + u, err := url.Parse(f.RelayHost) 27 28 if err != nil { 28 29 return fmt.Errorf("invalid relayHost URI: %w", err) 29 30 } ··· 43 44 default: 44 45 } 45 46 46 - cursor, err := fc.GetCursor(ctx, fc.RelayHost) 47 + cursor, err := f.GetCursor(ctx, f.RelayHost) 47 48 if err != nil { 48 49 return fmt.Errorf("failed to read cursor: %w", err) 49 50 } ··· 53 54 } 54 55 urlStr := u.String() 55 56 56 - fc.Logger.Info("connecting to firehose", "url", urlStr, "cursor", cursor, "retries", retries) 57 + f.Logger.Info("connecting to firehose", "url", urlStr, "cursor", cursor, "retries", retries) 57 58 58 59 dialer := websocket.DefaultDialer 59 60 con, _, err := dialer.DialContext(ctx, urlStr, http.Header{}) 60 61 if err != nil { 61 - fc.Logger.Warn("dialing failed", "err", err, "retries", retries) 62 + f.Logger.Warn("dialing failed", "error", err, "retries", retries) 62 63 time.Sleep(backoff(retries, 10)) 63 64 retries++ 64 65 continue 65 66 } 66 67 67 - fc.Logger.Info("connected to firehose") 68 + f.Logger.Info("connected to firehose") 68 69 retries = 0 69 70 70 71 scheduler := parallel.NewScheduler( 71 - fc.Parallelism, 72 + f.Parallelism, 72 73 100, 73 - fc.RelayHost, 74 - fc.Callbacks.EventHandler, 74 + f.RelayHost, 75 + f.Callbacks.EventHandler, 75 76 ) 76 77 if err := events.HandleRepoStream(ctx, con, scheduler, nil); err != nil { 77 - fc.Logger.Warn("firehose connection failed", "err", err) 78 + f.Logger.Warn("firehose connection failed", "error", err) 78 79 } 79 80 } 80 81 }
+3 -3
cmd/nexus/main.go
··· 16 16 17 17 func main() { 18 18 if err := run(os.Args); err != nil { 19 - slog.Error("exiting process", "err", err.Error()) 19 + slog.Error("exiting process", "error", err) 20 20 os.Exit(-1) 21 21 } 22 22 } ··· 168 168 logger.Info("received shutdown signal") 169 169 case err := <-svcErr: 170 170 if err != nil { 171 - logger.Error("service error", "err", err) 171 + logger.Error("service error", "error", err) 172 172 } 173 173 } 174 174 ··· 179 179 defer shutdownCancel() 180 180 181 181 if err := nexus.Server.Shutdown(shutdownCtx); err != nil { 182 - logger.Error("error during shutdown", "err", err) 182 + logger.Error("error during shutdown", "error", err) 183 183 return err 184 184 } 185 185
+1
cmd/nexus/nexus.go
··· 158 158 return n, nil 159 159 } 160 160 161 + // Run starts internal background workers for resync, cursor saving, and outbox delivery. 161 162 func (n *Nexus) Run(ctx context.Context) { 162 163 resyncParallelism := n.config.ResyncParallelism 163 164 if resyncParallelism == 0 {
+11 -2
cmd/nexus/outbox.go
··· 80 80 } 81 81 } 82 82 83 + // Run starts the outbox workers for event delivery and cleanup. 83 84 func (o *Outbox) Run(ctx context.Context) { 84 85 o.ctx = ctx 85 86 ··· 120 121 Order("id ASC"). 121 122 Limit(batchSize). 122 123 Find(&events).Error; err != nil { 123 - o.logger.Error("failed to load events into cache", "error", err) 124 + o.logger.Error("failed to load events into cache", "error", err, "lastID", lastID) 124 125 return 125 126 } 126 127 ··· 204 205 } 205 206 } 206 207 208 + // AckEvent marks an event as delivered and queues it for deletion. 207 209 func (o *Outbox) AckEvent(eventID uint) { 208 210 o.cacheMu.RLock() 209 211 outboxEvt, exists := o.eventCache[eventID] ··· 381 383 // returns when it hits a blocking event 382 384 func (w *DIDWorker) processPendingEvts() { 383 385 for { 386 + select { 387 + case <-w.ctx.Done(): 388 + return 389 + default: 390 + } 391 + 384 392 w.mu.Lock() 385 393 if w.blockedOnLive { 386 - return // can't proceed, break out of send loop and wait for acks 394 + w.mu.Unlock() 395 + return 387 396 } 388 397 389 398 if len(w.pendingEvts) == 0 {
+19 -13
cmd/nexus/processor.go
··· 31 31 lastSeq atomic.Int64 32 32 } 33 33 34 + // ProcessCommit validates and applies a commit event from the firehose. 34 35 func (ep *EventProcessor) ProcessCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 35 - defer ep.lastSeq.Swap(evt.Seq) 36 + defer ep.lastSeq.Store(evt.Seq) 36 37 37 38 curr, err := ep.GetRepoState(evt.Repo) 38 39 if err != nil { ··· 167 168 return commit, nil 168 169 } 169 170 171 + // ProcessSync handles sync events and marks repos for resync if needed. 170 172 func (ep *EventProcessor) ProcessSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error { 171 - defer ep.lastSeq.Swap(evt.Seq) 173 + defer ep.lastSeq.Store(evt.Seq) 172 174 173 175 curr, err := ep.GetRepoState(evt.Did) 174 176 if err != nil { ··· 212 214 } 213 215 214 216 func (ep *EventProcessor) ProcessIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error { 215 - defer ep.lastSeq.Swap(evt.Seq) 217 + defer ep.lastSeq.Store(evt.Seq) 216 218 return ep.RefreshIdentity(ctx, evt.Did) 217 219 } 218 220 221 + // RefreshIdentity fetches the latest identity information for a DID. 219 222 func (ep *EventProcessor) RefreshIdentity(ctx context.Context, did string) error { 220 223 curr, err := ep.GetRepoState(did) 221 224 if err != nil { ··· 269 272 } 270 273 271 274 func (ep *EventProcessor) ProcessAccount(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error { 272 - defer ep.lastSeq.Swap(evt.Seq) 275 + defer ep.lastSeq.Store(evt.Seq) 273 276 274 277 curr, err := ep.GetRepoState(evt.Did) 275 278 if err != nil { ··· 288 291 var updateTo models.AccountStatus 289 292 if evt.Active { 290 293 updateTo = models.AccountStatusActive 291 - } else if *evt.Status == string(models.AccountStatusDeactivated) || *evt.Status == string(models.AccountStatusTakendown) || *evt.Status == string(models.AccountStatusSuspended) || *evt.Status == string(models.AccountStatusDeleted) { 294 + } else if evt.Status != nil && (*evt.Status == string(models.AccountStatusDeactivated) || *evt.Status == string(models.AccountStatusTakendown) || *evt.Status == string(models.AccountStatusSuspended) || *evt.Status == string(models.AccountStatusDeleted)) { 292 295 updateTo = models.AccountStatus(*evt.Status) 293 296 } else { 294 297 // no-op for other events such as throttled or desynchronized ··· 434 437 }) 435 438 } 436 439 437 - if len(outboxBatch) > 0 { 438 - if err := tx.CreateInBatches(outboxBatch, 100).Error; err != nil { 439 - return err 440 - } 441 - } 440 + return batchInsertOutboxEvents(tx, outboxBatch) 441 + } 442 442 443 - return nil 443 + func batchInsertOutboxEvents(tx *gorm.DB, events []*models.OutboxBuffer) error { 444 + if len(events) == 0 { 445 + return nil 446 + } 447 + return tx.CreateInBatches(events, 100).Error 444 448 } 445 449 446 450 func deleteRepo(tx *gorm.DB, did string) error { ··· 478 482 }).Error 479 483 } 480 484 485 + // RunCursorSaver periodically saves the firehose cursor to the database. 481 486 func (ep *EventProcessor) RunCursorSaver(ctx context.Context, interval time.Duration) { 482 487 ticker := time.NewTicker(interval) 483 488 defer ticker.Stop() ··· 486 491 select { 487 492 case <-ctx.Done(): 488 493 if err := ep.saveCursor(ctx); err != nil { 489 - ep.Logger.Error("failed to save cursor on shutdown", "error", err) 494 + ep.Logger.Error("failed to save cursor on shutdown", "error", err, "relayHost", ep.RelayHost) 490 495 } 491 496 return 492 497 case <-ticker.C: 493 498 if err := ep.saveCursor(ctx); err != nil { 494 - ep.Logger.Error("failed to save cursor", "error", err) 499 + ep.Logger.Error("failed to save cursor", "error", err, "relayHost", ep.RelayHost) 495 500 } 496 501 } 497 502 } ··· 526 531 Update("state", state).Error 527 532 } 528 533 534 + // EnsureRepo creates or updates a repository record in the database. 529 535 func (ep *EventProcessor) EnsureRepo(did string) error { 530 536 return ep.DB.Save(&models.Repo{ 531 537 Did: did,
+14 -9
cmd/nexus/resync.go
··· 88 88 return nil 89 89 } 90 90 91 - const BATCH_SIZE = 100 91 + const batchSize = 100 92 92 93 93 func (n *Nexus) doResync(ctx context.Context, did string) (bool, error) { 94 94 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) ··· 111 111 n.logger.Info("fetching repo from PDS", "did", did, "pds", pdsURL) 112 112 113 113 client := &xrpc.Client{ 114 - Client: &http.Client{}, 115 - Host: pdsURL, 114 + Client: &http.Client{ 115 + Timeout: 30 * time.Second, 116 + }, 117 + Host: pdsURL, 116 118 } 117 119 118 120 repoBytes, err := comatproto.SyncGetRepo(ctx, client, did, "") ··· 151 153 var evtBatch []*RecordEvt 152 154 153 155 err = r.MST.Walk(func(recPathBytes []byte, recCid cid.Cid) error { 156 + select { 157 + case <-ctx.Done(): 158 + return ctx.Err() 159 + default: 160 + } 161 + 154 162 recPath := string(recPathBytes) 155 163 collection, rkey, err := syntax.ParseRepoPath(recPath) 156 164 if err != nil { ··· 201 209 } 202 210 evtBatch = append(evtBatch, evt) 203 211 204 - if len(evtBatch) >= BATCH_SIZE { 212 + if len(evtBatch) >= batchSize { 205 213 if err := n.writeBatch(evtBatch); err != nil { 206 214 n.logger.Error("failed to flush batch", "error", err, "did", did) 207 215 return err ··· 279 287 return err 280 288 } 281 289 } 282 - if len(outboxBatch) > 0 { 283 - return tx.CreateInBatches(outboxBatch, 100).Error 284 - } 285 - return nil 290 + return batchInsertOutboxEvents(tx, outboxBatch) 286 291 }) 287 292 } 288 293 ··· 303 308 } 304 309 305 310 // start a 1 min & go up to 1 hr between retries 306 - retryAfter := time.Now().Add(60 * backoff(repo.RetryCount, 60)) 311 + retryAfter := time.Now().Add(backoff(repo.RetryCount, 60)) 307 312 308 313 dbErr := n.db.Model(&models.Repo{}). 309 314 Where("did = ?", did).
+30 -29
cmd/nexus/server.go
··· 18 18 Outbox *Outbox 19 19 } 20 20 21 - func (n *NexusServer) Start(address string) error { 22 - n.echo = echo.New() 23 - n.echo.HideBanner = true 24 - n.echo.GET("/health", n.handleHealthcheck) 25 - n.echo.GET("/listen", n.handleListen) 26 - n.echo.POST("/add-repos", n.handleAddRepos) 27 - n.echo.POST("/remove-repos", n.handleAddRepos) 28 - return n.echo.Start(address) 21 + func (ns *NexusServer) Start(address string) error { 22 + ns.echo = echo.New() 23 + ns.echo.HideBanner = true 24 + ns.echo.GET("/health", ns.handleHealthcheck) 25 + ns.echo.GET("/listen", ns.handleListen) 26 + ns.echo.POST("/add-repos", ns.handleAddRepos) 27 + ns.echo.POST("/remove-repos", ns.handleRemoveRepos) 28 + return ns.echo.Start(address) 29 29 } 30 30 31 - func (n *NexusServer) Shutdown(ctx context.Context) error { 32 - return n.echo.Shutdown(ctx) 31 + // Shutdown gracefully shuts down the HTTP server. 32 + func (ns *NexusServer) Shutdown(ctx context.Context) error { 33 + return ns.echo.Shutdown(ctx) 33 34 } 34 35 35 - func (n *NexusServer) handleHealthcheck(c echo.Context) error { 36 + func (ns *NexusServer) handleHealthcheck(c echo.Context) error { 36 37 return c.JSON(http.StatusOK, map[string]string{ 37 38 "status": "ok", 38 39 }) ··· 44 45 }, 45 46 } 46 47 47 - func (n *NexusServer) handleListen(c echo.Context) error { 48 - if n.Outbox.mode == OutboxModeWebhook { 48 + func (ns *NexusServer) handleListen(c echo.Context) error { 49 + if ns.Outbox.mode == OutboxModeWebhook { 49 50 return echo.NewHTTPError(http.StatusBadRequest, "websocket not available in webhook mode") 50 51 } 51 52 ··· 55 56 } 56 57 defer ws.Close() 57 58 58 - n.logger.Info("websocket connected") 59 + ns.logger.Info("websocket connected") 59 60 60 61 // read loop to detect disconnects and handle acks in websocket-ack mode 61 62 disconnected := make(chan struct{}) ··· 69 70 } 70 71 71 72 // Process acks directly in websocket-ack mode 72 - if n.Outbox.mode == OutboxModeWebsocketAck { 73 - n.Outbox.AckEvent(msg.ID) 73 + if ns.Outbox.mode == OutboxModeWebsocketAck { 74 + ns.Outbox.AckEvent(msg.ID) 74 75 } 75 76 } 76 77 }() ··· 78 79 for { 79 80 select { 80 81 case <-disconnected: 81 - n.logger.Info("websocket disconnected") 82 + ns.logger.Info("websocket disconnected") 82 83 return nil 83 - case evt, ok := <-n.Outbox.events: 84 + case evt, ok := <-ns.Outbox.events: 84 85 if !ok { 85 86 return nil 86 87 } 87 88 if err := ws.WriteJSON(evt); err != nil { 88 - n.logger.Info("websocket write error", "error", err) 89 + ns.logger.Info("websocket write error", "error", err) 89 90 return nil 90 91 } 91 92 // In fire-and-forget mode, ack immediately after write succeeds 92 93 // In websocket-ack mode, wait for client to send ack and handle in read loop 93 - if n.Outbox.mode == OutboxModeFireAndForget { 94 - n.Outbox.AckEvent(evt.ID) 94 + if ns.Outbox.mode == OutboxModeFireAndForget { 95 + ns.Outbox.AckEvent(evt.ID) 95 96 } 96 97 } 97 98 } ··· 101 102 DIDs []string `json:"dids"` 102 103 } 103 104 104 - func (n *NexusServer) handleAddRepos(c echo.Context) error { 105 + func (ns *NexusServer) handleAddRepos(c echo.Context) error { 105 106 var payload DidPayload 106 107 if err := c.Bind(&payload); err != nil { 107 108 return err ··· 115 116 } 116 117 } 117 118 118 - if err := n.db.Save(&dids).Error; err != nil { 119 - n.logger.Error("failed to upsert dids", "error", err) 119 + if err := ns.db.Save(&dids).Error; err != nil { 120 + ns.logger.Error("failed to upsert dids", "error", err) 120 121 return echo.NewHTTPError(http.StatusInternalServerError) 121 122 } 122 123 123 - n.logger.Info("added dids", "count", len(payload.DIDs)) 124 + ns.logger.Info("added dids", "count", len(payload.DIDs)) 124 125 125 126 return c.JSON(http.StatusOK, map[string]interface{}{ 126 127 "count": len(payload.DIDs), 127 128 }) 128 129 } 129 130 130 - func (n *NexusServer) handleRemoveRepos(c echo.Context) error { 131 + func (ns *NexusServer) handleRemoveRepos(c echo.Context) error { 131 132 var payload DidPayload 132 133 if err := c.Bind(&payload); err != nil { 133 134 return err 134 135 } 135 136 136 137 for _, did := range payload.DIDs { 137 - err := deleteRepo(n.db, did) 138 + err := deleteRepo(ns.db, did) 138 139 if err != nil { 139 - n.logger.Error("failed to delete repo", "error", err) 140 + ns.logger.Error("failed to delete repo", "error", err) 140 141 return echo.NewHTTPError(http.StatusInternalServerError) 141 142 } 142 143 } 143 144 144 - n.logger.Info("added dids", "count", len(payload.DIDs)) 145 + ns.logger.Info("removed dids", "count", len(payload.DIDs)) 145 146 146 147 return c.JSON(http.StatusOK, map[string]interface{}{ 147 148 "count": len(payload.DIDs),