this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

backfill & better outbox

dholms a88d55fe 72ea73b0

+384 -138
+173
nexus/backfill.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/atproto/data" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + "github.com/bluesky-social/indigo/nexus/models" 14 + "github.com/bluesky-social/indigo/repo" 15 + "github.com/bluesky-social/indigo/xrpc" 16 + "github.com/ipfs/go-cid" 17 + ) 18 + 19 + func (n *Nexus) backfillDid(ctx context.Context, did string) { 20 + // Mark as backfilling 21 + if err := n.UpdateRepoState(did, models.RepoStateBackfilling, "", ""); err != nil { 22 + n.logger.Error("failed to update state to backfilling", "error", err, "did", did) 23 + return 24 + } 25 + 26 + n.logger.Info("starting backfill", "did", did) 27 + 28 + // Perform backfill 29 + rev, err := n.backfillRepo(ctx, did) 30 + if err != nil { 31 + n.logger.Error("backfill failed", "error", err, "did", did) 32 + // Mark as error state 33 + if updateErr := n.UpdateRepoState(did, models.RepoStateError, "", err.Error()); updateErr != nil { 34 + n.logger.Error("failed to update state to error", "error", updateErr, "did", did) 35 + } 36 + // Clean up buffered events 37 + n.mu.Lock() 38 + delete(n.backfillBuffer, did) 39 + n.mu.Unlock() 40 + return 41 + } 42 + 43 + // Drain buffered events that came in during backfill 44 + if err := n.drainBackfillBuffer(ctx, did); err != nil { 45 + n.logger.Error("failed to drain backfill buffer", "error", err, "did", did) 46 + if updateErr := n.UpdateRepoState(did, models.RepoStateError, rev, err.Error()); updateErr != nil { 47 + n.logger.Error("failed to update state to error", "error", updateErr, "did", did) 48 + } 49 + return 50 + } 51 + 52 + // Mark as active 53 + if err := n.UpdateRepoState(did, models.RepoStateActive, rev, ""); err != nil { 54 + n.logger.Error("failed to update state to active", "error", err, "did", did) 55 + return 56 + } 57 + 58 + n.logger.Info("backfill complete", "did", did, "rev", rev) 59 + } 60 + 61 + func (n *Nexus) drainBackfillBuffer(ctx context.Context, did string) error { 62 + // Get buffered events from memory 63 + n.mu.Lock() 64 + bufferedOps := n.backfillBuffer[did] 65 + delete(n.backfillBuffer, did) 66 + n.mu.Unlock() 67 + 68 + if len(bufferedOps) == 0 { 69 + return nil 70 + } 71 + 72 + n.logger.Info("draining backfill buffer from memory", "did", did, "count", len(bufferedOps)) 73 + 74 + // Send each buffered event 75 + for _, op := range bufferedOps { 76 + if err := n.outbox.Send(op); err != nil { 77 + return err 78 + } 79 + } 80 + 81 + n.logger.Info("drained backfill buffer", "did", did, "count", len(bufferedOps)) 82 + return nil 83 + } 84 + 85 + func (n *Nexus) backfillRepo(ctx context.Context, did string) (string, error) { 86 + // Resolve DID to PDS 87 + ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) 88 + if err != nil { 89 + return "", fmt.Errorf("failed to resolve DID: %w", err) 90 + } 91 + 92 + pdsHost := ident.PDSEndpoint() 93 + if pdsHost == "" { 94 + return "", fmt.Errorf("no PDS endpoint for DID: %s", did) 95 + } 96 + 97 + n.logger.Info("fetching repo from PDS", "did", did, "pds", pdsHost) 98 + 99 + // Create XRPC client 100 + client := &xrpc.Client{ 101 + Client: &http.Client{}, 102 + Host: pdsHost, 103 + } 104 + 105 + // Call com.atproto.sync.getRepo 106 + repoBytes, err := comatproto.SyncGetRepo(ctx, client, did, "") 107 + if err != nil { 108 + return "", fmt.Errorf("failed to get repo: %w", err) 109 + } 110 + 111 + n.logger.Info("parsing repo CAR", "did", did, "size", len(repoBytes)) 112 + 113 + // Parse the repo from CAR 114 + r, err := repo.ReadRepoFromCar(ctx, io.NopCloser(bytes.NewReader(repoBytes))) 115 + if err != nil { 116 + return "", fmt.Errorf("failed to read repo from CAR: %w", err) 117 + } 118 + 119 + rev := r.SignedCommit().Rev 120 + n.logger.Info("iterating repo records", "did", did, "rev", rev) 121 + 122 + numRecords := 0 123 + 124 + // Iterate through all records 125 + err = r.ForEach(ctx, "", func(recordPath string, nodeCid cid.Cid) error { 126 + // Get the record bytes 127 + blk, err := r.Blockstore().Get(ctx, nodeCid) 128 + if err != nil { 129 + n.logger.Error("failed to get block", "path", recordPath, "error", err) 130 + return nil // Skip this record 131 + } 132 + 133 + raw := blk.RawData() 134 + 135 + // Unmarshal to get the actual record 136 + rec, err := data.UnmarshalCBOR(raw) 137 + if err != nil { 138 + n.logger.Error("failed to unmarshal record", "path", recordPath, "error", err) 139 + return nil 140 + } 141 + 142 + // Parse the path to get collection and rkey 143 + collection, rkey, err := syntax.ParseRepoPath(recordPath) 144 + if err != nil { 145 + n.logger.Error("invalid record path", "path", recordPath, "error", err) 146 + return nil 147 + } 148 + 149 + // Send as create event 150 + op := &Op{ 151 + Did: did, 152 + Collection: collection.String(), 153 + Rkey: rkey.String(), 154 + Action: "create", 155 + Record: rec, 156 + Cid: nodeCid.String(), 157 + } 158 + 159 + if err := n.outbox.Send(op); err != nil { 160 + return fmt.Errorf("failed to send op: %w", err) 161 + } 162 + 163 + numRecords++ 164 + return nil 165 + }) 166 + 167 + if err != nil { 168 + return "", fmt.Errorf("failed to iterate repo: %w", err) 169 + } 170 + 171 + n.logger.Info("backfill repo complete", "did", did, "records", numRecords, "rev", rev) 172 + return rev, nil 173 + }
+37 -11
nexus/firehose.go
··· 13 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 14 "github.com/bluesky-social/indigo/events" 15 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 + "github.com/bluesky-social/indigo/nexus/models" 16 17 "github.com/gorilla/websocket" 17 18 ) 18 19 ··· 31 32 u.Scheme = "wss" 32 33 } 33 34 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 34 - // if cmd.IsSet("cursor") { 35 - // u.RawQuery = fmt.Sprintf("cursor=%d", cmd.Int("cursor")) 36 - // } 37 35 urlString := u.String() 38 - con, _, err := dialer.Dial(urlString, http.Header{ 39 - // "User-Agent": []string{*userAgent()}, 40 - }) 36 + con, _, err := dialer.Dial(urlString, http.Header{}) 41 37 if err != nil { 42 38 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 43 39 } ··· 73 69 } 74 70 75 71 func (nexus *Nexus) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 76 - if _, exists := nexus.filterDids[evt.Repo]; !exists { 72 + nexus.mu.RLock() 73 + exists := nexus.filterDids[evt.Repo] 74 + nexus.mu.RUnlock() 75 + 76 + if !exists { 77 77 return nil 78 78 } 79 + 80 + // Check repo state from database 81 + state, err := nexus.GetRepoState(evt.Repo) 82 + if err != nil { 83 + nexus.logger.Error("failed to get repo state", "did", evt.Repo, "error", err) 84 + return nil // Don't fail on state lookup errors 85 + } 86 + 79 87 r, err := repo.VerifyCommitMessage(ctx, evt) 80 88 if err != nil { 81 - nexus.logger.Info("failed to invert commit MST", "err", err) 89 + nexus.logger.Info("failed to verify commit", "did", evt.Repo, "err", err) 82 90 return err 83 91 } 92 + 84 93 for _, op := range evt.Ops { 85 94 coll, rkey, err := syntax.ParseRepoPath(op.Path) 86 95 if err != nil { ··· 108 117 outOp.Cid = recCid.String() 109 118 } 110 119 111 - err = nexus.outbox.Send(outOp) 112 - if err != nil { 113 - return err 120 + // If backfill is in progress, buffer to memory 121 + if state == models.RepoStatePending || state == models.RepoStateBackfilling { 122 + nexus.logger.Debug("buffering event to memory during backfill", "did", evt.Repo, "state", state) 123 + nexus.mu.Lock() 124 + nexus.backfillBuffer[evt.Repo] = append(nexus.backfillBuffer[evt.Repo], outOp) 125 + nexus.mu.Unlock() 126 + } else { 127 + // Normal flow - send through outbox 128 + err = nexus.outbox.Send(outOp) 129 + if err != nil { 130 + return err 131 + } 132 + } 133 + } 134 + 135 + // Update rev if repo is active 136 + if state == models.RepoStateActive { 137 + if err := nexus.UpdateRepoState(evt.Repo, models.RepoStateActive, evt.Rev, ""); err != nil { 138 + nexus.logger.Error("failed to update rev", "did", evt.Repo, "error", err) 114 139 } 115 140 } 141 + 116 142 return nil 117 143 }
+48 -68
nexus/handlers.go
··· 1 1 package main 2 2 3 3 import ( 4 - "encoding/json" 4 + "context" 5 5 "net/http" 6 6 7 7 "github.com/bluesky-social/indigo/nexus/models" 8 8 "github.com/gorilla/websocket" 9 9 "github.com/labstack/echo/v4" 10 - "gorm.io/gorm/clause" 11 10 ) 12 11 13 12 var upgrader = websocket.Upgrader{ ··· 29 28 } 30 29 31 30 func (n *Nexus) handleListen(c echo.Context) error { 32 - // Check if already connected 33 - if err := n.outbox.Connect(); err != nil { 34 - n.logger.Error("connection refused", "error", err) 35 - return echo.NewHTTPError(http.StatusConflict, "websocket already connected") 36 - } 37 - defer n.outbox.Disconnect() 38 - 39 31 // Upgrade to WebSocket 40 32 ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) 41 33 if err != nil { ··· 45 37 46 38 n.logger.Info("websocket connected") 47 39 48 - // Send buffered events first 49 - var bufferedEvts []models.BufferedEvt 50 - if err := n.db.Order("id ASC").Find(&bufferedEvts).Error; err != nil { 51 - n.logger.Error("failed to load buffered events", "error", err) 52 - return err 53 - } 54 - 55 - if len(bufferedEvts) > 0 { 56 - n.logger.Info("draining buffered events", "count", len(bufferedEvts)) 57 - for _, evt := range bufferedEvts { 58 - op := &Op{ 59 - Did: evt.Did, 60 - Collection: evt.Collection, 61 - Rkey: evt.Rkey, 62 - Action: evt.Action, 63 - Cid: evt.Cid, 64 - } 65 - 66 - if evt.Record != "" { 67 - var record map[string]interface{} 68 - if err := json.Unmarshal([]byte(evt.Record), &record); err != nil { 69 - n.logger.Error("failed to unmarshal record", "error", err, "id", evt.ID) 70 - continue 71 - } 72 - op.Record = record 73 - } 74 - 75 - if err := ws.WriteJSON(op); err != nil { 76 - n.logger.Info("websocket write error", "error", err) 77 - return nil 78 - } 79 - } 80 - 81 - // Delete buffered events 82 - if err := n.db.Delete(&bufferedEvts).Error; err != nil { 83 - n.logger.Error("failed to delete buffered events", "error", err) 84 - } else { 85 - n.logger.Info("cleared buffered events", "count", len(bufferedEvts)) 86 - } 87 - } 88 - 89 - // Mark that we're done draining and ready for live events 90 - n.outbox.StartLive() 91 - n.logger.Info("starting live event stream") 92 - for op := range n.outbox.outCh { 93 - if err := ws.WriteJSON(op); err != nil { 94 - n.logger.Info("websocket write error", "error", err) 95 - return nil 96 - } 97 - } 98 - return nil 40 + // Subscribe to outbox - it handles draining DB and streaming live events 41 + return n.outbox.Subscribe(c.Request().Context(), func(op *Op) error { 42 + return ws.WriteJSON(op) 43 + }) 99 44 } 100 45 101 46 type DidPayload struct { ··· 108 53 return err 109 54 } 110 55 111 - rows := make([]models.FilterDid, 0, len(payload.DIDs)) 56 + // Start transaction 57 + tx := n.db.Begin() 58 + defer tx.Rollback() 59 + 60 + var newDids []string 61 + 112 62 for _, did := range payload.DIDs { 113 - rows = append(rows, models.FilterDid{Did: did}) 63 + // Check if already exists 64 + var existing models.FilterDid 65 + err := tx.First(&existing, "did = ?", did).Error 66 + 67 + if err == nil { 68 + // Already exists, skip 69 + n.logger.Info("did already tracked", "did", did, "state", existing.State) 70 + continue 71 + } 72 + 73 + // Add to filter list with pending state 74 + filterDid := &models.FilterDid{ 75 + Did: did, 76 + State: models.RepoStatePending, 77 + } 78 + if err := tx.Create(filterDid).Error; err != nil { 79 + n.logger.Error("failed to insert did", "error", err, "did", did) 80 + return echo.NewHTTPError(http.StatusInternalServerError) 81 + } 82 + 83 + // Add to in-memory filter 84 + n.mu.Lock() 85 + n.filterDids[did] = true 86 + n.mu.Unlock() 87 + 88 + newDids = append(newDids, did) 114 89 } 115 90 116 - err := n.db.Clauses(clause.OnConflict{DoNothing: true}).Create(&rows).Error 117 - if err != nil { 118 - n.logger.Error("failed to insert dids", "error", err) 91 + if err := tx.Commit().Error; err != nil { 92 + n.logger.Error("failed to commit transaction", "error", err) 119 93 return echo.NewHTTPError(http.StatusInternalServerError) 120 94 } 121 95 122 - for _, did := range payload.DIDs { 123 - n.filterDids[did] = true 96 + // Kick off backfill for each new DID 97 + for _, did := range newDids { 98 + go n.backfillDid(context.Background(), did) 124 99 } 125 100 126 - return c.NoContent(http.StatusOK) 101 + n.logger.Info("added dids and started backfills", "new", len(newDids), "total", len(payload.DIDs)) 102 + 103 + return c.JSON(http.StatusOK, map[string]interface{}{ 104 + "added": len(newDids), 105 + "total": len(payload.DIDs), 106 + }) 127 107 }
+17 -1
nexus/models/models.go
··· 1 1 package models 2 2 3 + import "time" 4 + 5 + type RepoState string 6 + 7 + const ( 8 + RepoStatePending RepoState = "pending" 9 + RepoStateBackfilling RepoState = "backfilling" 10 + RepoStateActive RepoState = "active" 11 + RepoStateError RepoState = "error" 12 + ) 13 + 3 14 type FilterDid struct { 4 - Did string `gorm:"primaryKey"` 15 + Did string `gorm:"primaryKey"` 16 + State RepoState `gorm:"not null;default:'pending'"` 17 + Rev string `gorm:"type:text"` // Latest known revision 18 + ErrorMsg string `gorm:"type:text"` // Error message if state is error 19 + CreatedAt time.Time `gorm:"not null"` 20 + UpdatedAt time.Time `gorm:"not null"` 5 21 } 6 22 7 23 type BufferedEvt struct {
+44 -2
nexus/nexus.go
··· 3 3 import ( 4 4 "context" 5 5 "log/slog" 6 + "sync" 7 + "time" 6 8 9 + "github.com/bluesky-social/indigo/atproto/identity" 7 10 "github.com/bluesky-social/indigo/nexus/models" 8 11 "github.com/labstack/echo/v4" 9 12 "gorm.io/driver/sqlite" ··· 15 18 echo *echo.Echo 16 19 logger *slog.Logger 17 20 18 - filterDids map[string]bool 21 + filterDids map[string]bool // DID -> exists (for quick filtering) 22 + backfillBuffer map[string][]*Op // DID -> buffered ops during backfill 23 + mu sync.RWMutex 24 + 25 + // for signature verification 26 + Dir identity.Directory 19 27 20 28 outbox *Outbox 21 29 } ··· 49 57 e := echo.New() 50 58 e.HideBanner = true 51 59 60 + // main thing is skipping handle verification 61 + bdir := identity.BaseDirectory{ 62 + SkipHandleVerification: true, 63 + TryAuthoritativeDNS: false, 64 + SkipDNSDomainSuffixes: []string{".bsky.social"}, 65 + } 66 + cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5) 67 + 52 68 n := &Nexus{ 53 69 db: db, 54 70 echo: e, 55 71 logger: slog.Default().With("system", "nexus"), 56 72 57 - filterDids: make(map[string]bool), 73 + filterDids: make(map[string]bool), 74 + backfillBuffer: make(map[string][]*Op), 75 + 76 + Dir: &cdir, 58 77 59 78 outbox: NewOutbox(db), 60 79 } ··· 103 122 104 123 for _, f := range filterDids { 105 124 n.filterDids[f.Did] = true 125 + 126 + // Resume backfill for any repos in pending or backfilling state 127 + if f.State == models.RepoStatePending || f.State == models.RepoStateBackfilling { 128 + go n.backfillDid(context.Background(), f.Did) 129 + } 106 130 } 107 131 108 132 return nil 109 133 } 134 + 135 + func (n *Nexus) GetRepoState(did string) (models.RepoState, error) { 136 + var filterDid models.FilterDid 137 + if err := n.db.First(&filterDid, "did = ?", did).Error; err != nil { 138 + return "", err 139 + } 140 + return filterDid.State, nil 141 + } 142 + 143 + func (n *Nexus) UpdateRepoState(did string, state models.RepoState, rev string, errorMsg string) error { 144 + return n.db.Model(&models.FilterDid{}). 145 + Where("did = ?", did). 146 + Updates(map[string]interface{}{ 147 + "state": state, 148 + "rev": rev, 149 + "error_msg": errorMsg, 150 + }).Error 151 + }
+65 -56
nexus/outbox.go
··· 1 1 package main 2 2 3 3 import ( 4 + "context" 4 5 "encoding/json" 5 - "errors" 6 - "sync" 6 + "log/slog" 7 7 8 8 "github.com/bluesky-social/indigo/nexus/models" 9 9 "gorm.io/gorm" 10 10 ) 11 11 12 - var ErrAlreadyConnected = errors.New("websocket already connected") 13 - 14 12 type Outbox struct { 15 - db *gorm.DB 16 - outCh chan *Op 17 - mu sync.RWMutex 18 - connected bool 19 - drainingBuf bool 13 + db *gorm.DB 14 + outCh chan *Op 15 + logger *slog.Logger 20 16 } 21 17 22 18 func NewOutbox(db *gorm.DB) *Outbox { 23 19 return &Outbox{ 24 - db: db, 25 - outCh: make(chan *Op, 100), 26 - connected: false, 27 - drainingBuf: false, 20 + db: db, 21 + outCh: make(chan *Op, 100), 22 + logger: slog.Default().With("system", "outbox"), 28 23 } 29 24 } 30 25 31 - func (o *Outbox) Connect() error { 32 - o.mu.Lock() 33 - defer o.mu.Unlock() 34 - 35 - if o.connected { 36 - return ErrAlreadyConnected 26 + // Subscribe drains buffered events from DB, then streams live events from channel 27 + // The send function is called for each event (e.g., ws.WriteJSON) 28 + func (o *Outbox) Subscribe(ctx context.Context, send func(*Op) error) error { 29 + // 1. Load and drain buffered events from DB first 30 + var bufferedEvts []models.BufferedEvt 31 + if err := o.db.Order("id ASC").Find(&bufferedEvts).Error; err != nil { 32 + o.logger.Error("failed to load buffered events", "error", err) 33 + return err 37 34 } 38 35 39 - o.connected = true 40 - o.drainingBuf = true 41 - return nil 42 - } 36 + if len(bufferedEvts) > 0 { 37 + o.logger.Info("draining buffered events", "count", len(bufferedEvts)) 38 + for _, evt := range bufferedEvts { 39 + op := &Op{ 40 + Did: evt.Did, 41 + Collection: evt.Collection, 42 + Rkey: evt.Rkey, 43 + Action: evt.Action, 44 + Cid: evt.Cid, 45 + } 43 46 44 - func (o *Outbox) StartLive() { 45 - o.mu.Lock() 46 - defer o.mu.Unlock() 47 - o.drainingBuf = false 48 - } 47 + if evt.Record != "" { 48 + var record map[string]interface{} 49 + if err := json.Unmarshal([]byte(evt.Record), &record); err != nil { 50 + o.logger.Error("failed to unmarshal record", "error", err, "id", evt.ID) 51 + continue 52 + } 53 + op.Record = record 54 + } 49 55 50 - func (o *Outbox) Disconnect() { 51 - o.mu.Lock() 52 - defer o.mu.Unlock() 53 - o.connected = false 54 - o.drainingBuf = false 55 - } 56 - 57 - func (o *Outbox) IsConnected() bool { 58 - o.mu.RLock() 59 - defer o.mu.RUnlock() 60 - return o.connected 61 - } 62 - 63 - func (o *Outbox) Send(op *Op) error { 64 - o.mu.RLock() 65 - connected := o.connected 66 - drainingBuf := o.drainingBuf 67 - o.mu.RUnlock() 56 + if err := send(op); err != nil { 57 + o.logger.Info("send error during drain", "error", err) 58 + return err 59 + } 60 + } 68 61 69 - // If connected but still draining buffered events, buffer to DB 70 - if drainingBuf { 71 - return o.bufferToDB(op) 62 + // Delete drained events 63 + if err := o.db.Delete(&bufferedEvts).Error; err != nil { 64 + o.logger.Error("failed to delete buffered events", "error", err) 65 + } else { 66 + o.logger.Info("cleared buffered events", "count", len(bufferedEvts)) 67 + } 72 68 } 73 69 74 - if connected { 70 + // 2. Stream live events from channel 71 + o.logger.Info("starting live event stream") 72 + for { 75 73 select { 76 - case o.outCh <- op: 77 - return nil 78 - default: 79 - // Channel full, buffer to DB 80 - return o.bufferToDB(op) 74 + case <-ctx.Done(): 75 + return ctx.Err() 76 + case op := <-o.outCh: 77 + if err := send(op); err != nil { 78 + o.logger.Info("send error during live stream", "error", err) 79 + return err 80 + } 81 81 } 82 - } else { 82 + } 83 + } 84 + 85 + // Send attempts to deliver event via channel, falls back to DB if channel is full or blocked 86 + func (o *Outbox) Send(op *Op) error { 87 + select { 88 + case o.outCh <- op: 89 + return nil 90 + default: 91 + // Channel full or no readers, persist to DB 83 92 return o.bufferToDB(op) 84 93 } 85 94 }