this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

maintain path -> cid mapping instead of backfill buffer

dholms 309f20e3 a88d55fe

+124 -135
+55 -63
nexus/backfill.go
··· 17 17 ) 18 18 19 19 func (n *Nexus) backfillDid(ctx context.Context, did string) { 20 - // Mark as backfilling 21 20 if err := n.UpdateRepoState(did, models.RepoStateBackfilling, "", ""); err != nil { 22 21 n.logger.Error("failed to update state to backfilling", "error", err, "did", did) 23 22 return ··· 25 24 26 25 n.logger.Info("starting backfill", "did", did) 27 26 28 - // Perform backfill 29 27 rev, err := n.backfillRepo(ctx, did) 30 28 if err != nil { 31 29 n.logger.Error("backfill failed", "error", err, "did", did) 32 - // Mark as error state 33 30 if updateErr := n.UpdateRepoState(did, models.RepoStateError, "", err.Error()); updateErr != nil { 34 31 n.logger.Error("failed to update state to error", "error", updateErr, "did", did) 35 32 } 36 - // Clean up buffered events 37 - n.mu.Lock() 38 - delete(n.backfillBuffer, did) 39 - n.mu.Unlock() 40 33 return 41 34 } 42 35 43 - // Drain buffered events that came in during backfill 44 - if err := n.drainBackfillBuffer(ctx, did); err != nil { 45 - n.logger.Error("failed to drain backfill buffer", "error", err, "did", did) 46 - if updateErr := n.UpdateRepoState(did, models.RepoStateError, rev, err.Error()); updateErr != nil { 47 - n.logger.Error("failed to update state to error", "error", updateErr, "did", did) 48 - } 49 - return 50 - } 51 - 52 - // Mark as active 53 36 if err := n.UpdateRepoState(did, models.RepoStateActive, rev, ""); err != nil { 54 37 n.logger.Error("failed to update state to active", "error", err, "did", did) 55 38 return ··· 58 41 n.logger.Info("backfill complete", "did", did, "rev", rev) 59 42 } 60 43 61 - func (n *Nexus) drainBackfillBuffer(ctx context.Context, did string) error { 62 - // Get buffered events from memory 63 - n.mu.Lock() 64 - bufferedOps := n.backfillBuffer[did] 65 - delete(n.backfillBuffer, did) 66 - n.mu.Unlock() 67 - 68 - if len(bufferedOps) == 0 { 69 - return nil 70 - } 71 - 72 - n.logger.Info("draining backfill buffer from memory", "did", did, "count", len(bufferedOps)) 73 - 74 - // Send each buffered event 75 - for _, op := range bufferedOps { 76 - if err := n.outbox.Send(op); err != nil { 77 - return err 78 - } 79 - } 80 - 81 - n.logger.Info("drained backfill buffer", "did", did, "count", len(bufferedOps)) 82 - return nil 83 - } 84 - 85 44 func (n *Nexus) backfillRepo(ctx context.Context, did string) (string, error) { 86 45 // Resolve DID to PDS 87 46 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) ··· 89 48 return "", fmt.Errorf("failed to resolve DID: %w", err) 90 49 } 91 50 92 - pdsHost := ident.PDSEndpoint() 93 - if pdsHost == "" { 51 + pdsURL := ident.PDSEndpoint() 52 + if pdsURL == "" { 94 53 return "", fmt.Errorf("no PDS endpoint for DID: %s", did) 95 54 } 96 55 97 - n.logger.Info("fetching repo from PDS", "did", did, "pds", pdsHost) 56 + n.logger.Info("fetching repo from PDS", "did", did, "pds", pdsURL) 98 57 99 58 // Create XRPC client 100 59 client := &xrpc.Client{ 101 60 Client: &http.Client{}, 102 - Host: pdsHost, 61 + Host: pdsURL, 103 62 } 104 63 105 64 // Call com.atproto.sync.getRepo ··· 119 78 rev := r.SignedCommit().Rev 120 79 n.logger.Info("iterating repo records", "did", did, "rev", rev) 121 80 81 + // Pre-load existing CID mappings for this DID into memory 82 + var existingRecords []models.RepoRecord 83 + if err := n.db.Find(&existingRecords, "did = ?", did).Error; err != nil { 84 + return "", fmt.Errorf("failed to load existing records: %w", err) 85 + } 86 + 87 + // Build map: "collection/rkey" -> CID 88 + existingCids := make(map[string]string, len(existingRecords)) 89 + for _, rec := range existingRecords { 90 + key := rec.Collection + "/" + rec.Rkey 91 + existingCids[key] = rec.Cid 92 + } 93 + n.logger.Info("pre-loaded existing records", "did", did, "count", len(existingCids)) 94 + 122 95 numRecords := 0 123 96 124 - // Iterate through all records 125 - err = r.ForEach(ctx, "", func(recordPath string, nodeCid cid.Cid) error { 126 - // Get the record bytes 127 - blk, err := r.Blockstore().Get(ctx, nodeCid) 97 + err = r.ForEach(ctx, "", func(recPath string, recCid cid.Cid) error { 98 + collection, rkey, err := syntax.ParseRepoPath(recPath) 128 99 if err != nil { 129 - n.logger.Error("failed to get block", "path", recordPath, "error", err) 130 - return nil // Skip this record 100 + n.logger.Error("invalid record path", "path", recPath, "error", err) 101 + return nil 131 102 } 132 103 133 - raw := blk.RawData() 104 + collStr := collection.String() 105 + rkeyStr := rkey.String() 106 + cidStr := recCid.String() 134 107 135 - // Unmarshal to get the actual record 136 - rec, err := data.UnmarshalCBOR(raw) 108 + existingCid, exists := existingCids[recPath] 109 + action := "create" 110 + if exists { 111 + if existingCid == cidStr { 112 + return nil 113 + } else { 114 + action = "update" 115 + } 116 + } 117 + 118 + blk, err := r.Blockstore().Get(ctx, recCid) 137 119 if err != nil { 138 - n.logger.Error("failed to unmarshal record", "path", recordPath, "error", err) 120 + n.logger.Error("failed to get block", "path", recPath, "error", err) 139 121 return nil 140 122 } 141 123 142 - // Parse the path to get collection and rkey 143 - collection, rkey, err := syntax.ParseRepoPath(recordPath) 124 + rec, err := data.UnmarshalCBOR(blk.RawData()) 144 125 if err != nil { 145 - n.logger.Error("invalid record path", "path", recordPath, "error", err) 126 + n.logger.Error("failed to unmarshal record", "path", recPath, "error", err) 146 127 return nil 147 128 } 148 129 149 - // Send as create event 150 130 op := &Op{ 151 131 Did: did, 152 - Collection: collection.String(), 153 - Rkey: rkey.String(), 154 - Action: "create", 132 + Collection: collStr, 133 + Rkey: rkeyStr, 134 + Action: action, 155 135 Record: rec, 156 - Cid: nodeCid.String(), 136 + Cid: recCid.String(), 157 137 } 158 138 159 139 if err := n.outbox.Send(op); err != nil { 160 140 return fmt.Errorf("failed to send op: %w", err) 141 + } 142 + 143 + // Update RepoRecord table with new CID 144 + repoRecord := models.RepoRecord{ 145 + Did: did, 146 + Collection: collStr, 147 + Rkey: rkeyStr, 148 + Cid: cidStr, 149 + Rev: rev, 150 + } 151 + if err := n.db.Save(&repoRecord).Error; err != nil { 152 + n.logger.Error("failed to save repo record", "error", err, "did", did, "path", recPath) 161 153 } 162 154 163 155 numRecords++
+49 -38
nexus/firehose.go
··· 77 77 return nil 78 78 } 79 79 80 - // Check repo state from database 81 - state, err := nexus.GetRepoState(evt.Repo) 82 - if err != nil { 83 - nexus.logger.Error("failed to get repo state", "did", evt.Repo, "error", err) 84 - return nil // Don't fail on state lookup errors 85 - } 86 - 87 80 r, err := repo.VerifyCommitMessage(ctx, evt) 88 81 if err != nil { 89 82 nexus.logger.Info("failed to verify commit", "did", evt.Repo, "err", err) ··· 96 89 return err 97 90 } 98 91 92 + collStr := coll.String() 93 + rkeyStr := rkey.String() 94 + 95 + if op.Action == "delete" { 96 + outOp := &Op{ 97 + Did: evt.Repo, 98 + Collection: collStr, 99 + Rkey: rkeyStr, 100 + Action: "delete", 101 + } 102 + 103 + if err := nexus.outbox.Send(outOp); err != nil { 104 + return err 105 + } 106 + 107 + if err := nexus.db.Where("did = ? AND collection = ? AND rkey = ?", evt.Repo, collStr, rkeyStr).Delete(&models.RepoRecord{}).Error; err != nil { 108 + nexus.logger.Error("failed to delete repo record", "error", err, "did", evt.Repo, "path", op.Path) 109 + } 110 + continue 111 + } 112 + 113 + recBytes, recCid, err := r.GetRecordBytes(ctx, coll, rkey) 114 + if err != nil { 115 + return err 116 + } 117 + cidStr := recCid.String() 118 + 119 + rec, err := data.UnmarshalCBOR(recBytes) 120 + if err != nil { 121 + return err 122 + } 123 + 99 124 outOp := &Op{ 100 125 Did: evt.Repo, 101 - Collection: coll.String(), 102 - Rkey: rkey.String(), 126 + Collection: collStr, 127 + Rkey: rkeyStr, 103 128 Action: op.Action, 129 + Record: rec, 130 + Cid: cidStr, 104 131 } 105 132 106 - // For creates and updates, get the record 107 - if op.Action == "create" || op.Action == "update" { 108 - recBytes, recCid, err := r.GetRecordBytes(ctx, coll, rkey) 109 - if err != nil { 110 - return err 111 - } 112 - rec, err := data.UnmarshalCBOR(recBytes) 113 - if err != nil { 114 - return err 115 - } 116 - outOp.Record = rec 117 - outOp.Cid = recCid.String() 133 + if err := nexus.outbox.Send(outOp); err != nil { 134 + return err 118 135 } 119 136 120 - // If backfill is in progress, buffer to memory 121 - if state == models.RepoStatePending || state == models.RepoStateBackfilling { 122 - nexus.logger.Debug("buffering event to memory during backfill", "did", evt.Repo, "state", state) 123 - nexus.mu.Lock() 124 - nexus.backfillBuffer[evt.Repo] = append(nexus.backfillBuffer[evt.Repo], outOp) 125 - nexus.mu.Unlock() 126 - } else { 127 - // Normal flow - send through outbox 128 - err = nexus.outbox.Send(outOp) 129 - if err != nil { 130 - return err 131 - } 137 + repoRecord := models.RepoRecord{ 138 + Did: evt.Repo, 139 + Collection: collStr, 140 + Rkey: rkeyStr, 141 + Cid: cidStr, 142 + Rev: evt.Rev, 143 + } 144 + if err := nexus.db.Save(&repoRecord).Error; err != nil { 145 + nexus.logger.Error("failed to save repo record", "error", err, "did", evt.Repo, "path", op.Path) 132 146 } 133 147 } 134 148 135 - // Update rev if repo is active 136 - if state == models.RepoStateActive { 137 - if err := nexus.UpdateRepoState(evt.Repo, models.RepoStateActive, evt.Rev, ""); err != nil { 138 - nexus.logger.Error("failed to update rev", "did", evt.Repo, "error", err) 139 - } 149 + if err := nexus.UpdateRepoState(evt.Repo, models.RepoStateActive, evt.Rev, ""); err != nil { 150 + nexus.logger.Error("failed to update rev", "did", evt.Repo, "error", err) 140 151 } 141 152 142 153 return nil
+2 -10
nexus/handlers.go
··· 28 28 } 29 29 30 30 func (n *Nexus) handleListen(c echo.Context) error { 31 - // Upgrade to WebSocket 32 31 ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 33 32 if err != nil { 34 33 return err ··· 37 36 38 37 n.logger.Info("websocket connected") 39 38 40 - // Subscribe to outbox - it handles draining DB and streaming live events 41 39 return n.outbox.Subscribe(c.Request().Context(), func(op *Op) error { 42 40 return ws.WriteJSON(op) 43 41 }) ··· 53 51 return err 54 52 } 55 53 56 - // Start transaction 57 54 tx := n.db.Begin() 58 55 defer tx.Rollback() 59 56 60 57 var newDids []string 61 58 62 59 for _, did := range payload.DIDs { 63 - // Check if already exists 64 60 var existing models.FilterDid 65 61 err := tx.First(&existing, "did = ?", did).Error 66 62 67 63 if err == nil { 68 - // Already exists, skip 69 64 n.logger.Info("did already tracked", "did", did, "state", existing.State) 70 65 continue 71 66 } 72 67 73 - // Add to filter list with pending state 74 68 filterDid := &models.FilterDid{ 75 69 Did: did, 76 70 State: models.RepoStatePending, ··· 80 74 return echo.NewHTTPError(http.StatusInternalServerError) 81 75 } 82 76 83 - // Add to in-memory filter 84 77 n.mu.Lock() 85 78 n.filterDids[did] = true 86 79 n.mu.Unlock() ··· 93 86 return echo.NewHTTPError(http.StatusInternalServerError) 94 87 } 95 88 96 - // Kick off backfill for each new DID 97 89 for _, did := range newDids { 98 90 go n.backfillDid(context.Background(), did) 99 91 } ··· 101 93 n.logger.Info("added dids and started backfills", "new", len(newDids), "total", len(payload.DIDs)) 102 94 103 95 return c.JSON(http.StatusOK, map[string]interface{}{ 104 - "added": len(newDids), 105 - "total": len(payload.DIDs), 96 + "added": len(newDids), 97 + "total": len(payload.DIDs), 106 98 }) 107 99 }
+14 -5
nexus/models/models.go
··· 5 5 type RepoState string 6 6 7 7 const ( 8 - RepoStatePending RepoState = "pending" 8 + RepoStatePending RepoState = "pending" 9 9 RepoStateBackfilling RepoState = "backfilling" 10 - RepoStateActive RepoState = "active" 11 - RepoStateError RepoState = "error" 10 + RepoStateActive RepoState = "active" 11 + RepoStateError RepoState = "error" 12 12 ) 13 13 14 14 type FilterDid struct { 15 15 Did string `gorm:"primaryKey"` 16 16 State RepoState `gorm:"not null;default:'pending'"` 17 - Rev string `gorm:"type:text"` // Latest known revision 18 - ErrorMsg string `gorm:"type:text"` // Error message if state is error 17 + Rev string `gorm:"type:text"` 18 + ErrorMsg string `gorm:"type:text"` 19 19 CreatedAt time.Time `gorm:"not null"` 20 20 UpdatedAt time.Time `gorm:"not null"` 21 21 } ··· 29 29 Cid string `gorm:"type:text"` 30 30 Record string `gorm:"type:text"` 31 31 } 32 + 33 + type RepoRecord struct { 34 + Did string `gorm:"primaryKey"` 35 + Collection string `gorm:"primaryKey"` 36 + Rkey string `gorm:"primaryKey"` 37 + Cid string `gorm:"not null"` 38 + Rev string `gorm:"not null"` 39 + UpdatedAt time.Time 40 + }
+4 -12
nexus/nexus.go
··· 18 18 echo *echo.Echo 19 19 logger *slog.Logger 20 20 21 - filterDids map[string]bool // DID -> exists (for quick filtering) 22 - backfillBuffer map[string][]*Op // DID -> buffered ops during backfill 23 - mu sync.RWMutex 21 + filterDids map[string]bool // DID -> exists (for quick filtering) 22 + mu sync.RWMutex 24 23 25 24 // for signature verification 26 25 Dir identity.Directory ··· 42 41 } 43 42 44 43 func NewNexus(config NexusConfig) (*Nexus, error) { 45 - // Open SQLite DB with GORM 46 44 db, err := gorm.Open(sqlite.Open(config.DBPath), &gorm.Config{}) 47 45 if err != nil { 48 46 return nil, err 49 47 } 50 48 51 - // Auto-migrate the schema 52 - if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterDid{}); err != nil { 49 + if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterDid{}, &models.RepoRecord{}); err != nil { 53 50 return nil, err 54 51 } 55 52 56 - // Create Echo instance 57 53 e := echo.New() 58 54 e.HideBanner = true 59 55 60 - // main thing is skipping handle verification 61 56 bdir := identity.BaseDirectory{ 62 57 SkipHandleVerification: true, 63 58 TryAuthoritativeDNS: false, ··· 70 65 echo: e, 71 66 logger: slog.Default().With("system", "nexus"), 72 67 73 - filterDids: make(map[string]bool), 74 - backfillBuffer: make(map[string][]*Op), 68 + filterDids: make(map[string]bool), 75 69 76 70 Dir: &cdir, 77 71 ··· 83 77 return nil, err 84 78 } 85 79 86 - // Register routes 87 80 n.registerRoutes() 88 81 89 82 return n, nil ··· 123 116 for _, f := range filterDids { 124 117 n.filterDids[f.Did] = true 125 118 126 - // Resume backfill for any repos in pending or backfilling state 127 119 if f.State == models.RepoStatePending || f.State == models.RepoStateBackfilling { 128 120 go n.backfillDid(context.Background(), f.Did) 129 121 }
-7
nexus/outbox.go
··· 23 23 } 24 24 } 25 25 26 - // Subscribe drains buffered events from DB, then streams live events from channel 27 - // The send function is called for each event (e.g., ws.WriteJSON) 28 26 func (o *Outbox) Subscribe(ctx context.Context, send func(*Op) error) error { 29 - // 1. Load and drain buffered events from DB first 30 27 var bufferedEvts []models.BufferedEvt 31 28 if err := o.db.Order("id ASC").Find(&bufferedEvts).Error; err != nil { 32 29 o.logger.Error("failed to load buffered events", "error", err) ··· 59 56 } 60 57 } 61 58 62 - // Delete drained events 63 59 if err := o.db.Delete(&bufferedEvts).Error; err != nil { 64 60 o.logger.Error("failed to delete buffered events", "error", err) 65 61 } else { ··· 67 63 } 68 64 } 69 65 70 - // 2. Stream live events from channel 71 66 o.logger.Info("starting live event stream") 72 67 for { 73 68 select { ··· 82 77 } 83 78 } 84 79 85 - // Send attempts to deliver event via channel, falls back to DB if channel is full or blocked 86 80 func (o *Outbox) Send(op *Op) error { 87 81 select { 88 82 case o.outCh <- op: 89 83 return nil 90 84 default: 91 - // Channel full or no readers, persist to DB 92 85 return o.bufferToDB(op) 93 86 } 94 87 }