this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

big old refactor

dholms b0a38708 9c43382b

+373 -344
+25 -54
nexus/backfill.go
··· 17 17 ) 18 18 19 19 func (n *Nexus) backfillDid(ctx context.Context, did string) error { 20 - if err := n.UpdateRepoState(did, models.RepoStateBackfilling, "", ""); err != nil { 20 + if err := n.db.Model(&models.Did{}). 21 + Where("did = ?", did). 22 + Updates(map[string]interface{}{ 23 + "state": models.RepoStateBackfilling, 24 + "rev": "", 25 + "error_msg": "", 26 + }).Error; err != nil { 21 27 return fmt.Errorf("failed to update state to backfilling: %w", err) 22 28 } 23 29 24 30 n.logger.Info("starting backfill", "did", did) 25 31 26 - rev, err := n.backfillRepo(ctx, did) 32 + rev, err := n.doBackfill(ctx, did) 27 33 if err != nil { 28 - n.UpdateRepoState(did, models.RepoStateError, "", err.Error()) 34 + n.db.Model(&models.Did{}). 35 + Where("did = ?", did). 36 + Updates(map[string]interface{}{ 37 + "state": models.RepoStateError, 38 + "rev": "", 39 + "error_msg": err.Error(), 40 + }) 29 41 return err 30 42 } 31 43 32 - if err := n.UpdateRepoState(did, models.RepoStateActive, rev, ""); err != nil { 44 + if err := n.db.Model(&models.Did{}). 45 + Where("did = ?", did). 46 + Updates(map[string]interface{}{ 47 + "state": models.RepoStateActive, 48 + "rev": rev, 49 + "error_msg": "", 50 + }).Error; err != nil { 33 51 return fmt.Errorf("failed to update state to active %w", err) 34 52 } 35 53 36 - if err := n.processBufferedEvents(ctx, did); err != nil { 37 - n.logger.Error("failed to process buffered events", "did", did, "error", err) 54 + if err := n.EventProcessor.drainBackfillBuffer(ctx, did); err != nil { 55 + n.logger.Error("failed to drain backfill buffer events", "did", did, "error", err) 38 56 } 39 57 40 58 return nil 41 59 } 42 60 43 - func (n *Nexus) backfillRepo(ctx context.Context, did string) (string, error) { 61 + func (n *Nexus) doBackfill(ctx context.Context, did string) (string, error) { 44 62 // Resolve DID to PDS 45 63 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) 46 64 if err != nil { ··· 139 157 return fmt.Errorf("failed to send op: %w", err) 140 158 } 141 159 142 - // Update RepoRecord table with new CID 143 160 repoRecord := models.RepoRecord{ 144 161 Did: did, 145 162 Collection: collStr, 146 163 Rkey: rkeyStr, 147 164 Cid: cidStr, 148 - Rev: rev, 149 165 } 150 166 if err := n.db.Save(&repoRecord).Error; err != nil { 151 167 n.logger.Error("failed to save repo record", "error", err, "did", did, "path", recPath) ··· 162 178 n.logger.Info("backfill repo complete", "did", did, "records", numRecords, "rev", rev) 163 179 return rev, nil 164 180 } 165 - 166 - func (n *Nexus) processBufferedEvents(ctx context.Context, did string) error { 167 - var bufferedEvts []models.BackfillBuffer 168 - if err := n.db.Where("did = ?", did).Order("id ASC").Find(&bufferedEvts).Error; err != nil { 169 - return fmt.Errorf("failed to load buffered events: %w", err) 170 - } 171 - 172 - if len(bufferedEvts) == 0 { 173 - return nil 174 - } 175 - 176 - n.logger.Info("processing buffered backfill events", "did", did, "count", len(bufferedEvts)) 177 - 178 - for _, evt := range bufferedEvts { 179 - op := &Op{ 180 - Did: evt.Did, 181 - Collection: evt.Collection, 182 - Rkey: evt.Rkey, 183 - Action: evt.Action, 184 - Cid: evt.Cid, 185 - } 186 - 187 - if err := n.outbox.Send(op); err != nil { 188 - return fmt.Errorf("failed to send buffered event: %w", err) 189 - } 190 - 191 - repoRecord := models.RepoRecord{ 192 - Did: evt.Did, 193 - Collection: evt.Collection, 194 - Rkey: evt.Rkey, 195 - Cid: evt.Cid, 196 - Rev: evt.Rev, 197 - } 198 - if err := n.db.Save(&repoRecord).Error; err != nil { 199 - n.logger.Error("failed to save repo record from buffered event", "did", evt.Did, "error", err) 200 - } 201 - } 202 - 203 - if err := n.db.Where("did = ?", did).Delete(&models.BackfillBuffer{}).Error; err != nil { 204 - n.logger.Error("failed to delete buffered backfill events", "did", did, "error", err) 205 - } 206 - 207 - n.logger.Info("processed buffered backfill events", "did", did, "count", len(bufferedEvts)) 208 - return nil 209 - }
+9 -201
nexus/firehose.go
··· 6 6 "log/slog" 7 7 "net/http" 8 8 "net/url" 9 - "sync/atomic" 10 9 11 - comatproto "github.com/bluesky-social/indigo/api/atproto" 12 - "github.com/bluesky-social/indigo/atproto/data" 13 - "github.com/bluesky-social/indigo/atproto/repo" 14 - "github.com/bluesky-social/indigo/atproto/syntax" 15 10 "github.com/bluesky-social/indigo/events" 16 11 "github.com/bluesky-social/indigo/events/schedulers/parallel" 17 - "github.com/bluesky-social/indigo/nexus/models" 18 12 "github.com/gorilla/websocket" 19 - "gorm.io/gorm" 20 13 ) 21 14 22 - type Op struct { 23 - Did string `json:"did"` 24 - Collection string `json:"collection"` 25 - Rkey string `json:"rkey"` 26 - Action string `json:"action"` 27 - Record map[string]interface{} `json:"record,omitempty"` 28 - Cid string `json:"cid,omitempty"` 29 - } 30 - 31 15 type FirehoseConsumer struct { 32 - RelayHost string 33 - Filter *StringSet 34 - Logger *slog.Logger 35 - DB *gorm.DB 36 - Parallelism int 37 - PersistCursorEvery int 38 - 39 - OnEvent func(context.Context, *Op) error 16 + RelayHost string 17 + Cursor int64 18 + Logger *slog.Logger 19 + Parallelism int 20 + Callbacks *events.RepoStreamCallbacks 40 21 } 41 22 42 23 func (fc *FirehoseConsumer) Run(ctx context.Context) error { 43 - cur, err := fc.readLastCursor(ctx) 44 - if err != nil { 45 - return err 46 - } 47 - 48 24 dialer := websocket.DefaultDialer 49 25 u, err := url.Parse(fc.RelayHost) 50 26 if err != nil { ··· 57 33 u.Scheme = "wss" 58 34 } 59 35 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 60 - if cur != 0 { 61 - u.RawQuery = fmt.Sprintf("cursor=%d", cur) 36 + if fc.Cursor != 0 { 37 + u.RawQuery = fmt.Sprintf("cursor=%d", fc.Cursor) 62 38 } 63 39 urlString := u.String() 64 - fc.Logger.Info("subscribing to firehose", "relayHost", fc.RelayHost, "cursor", cur) 40 + fc.Logger.Info("subscribing to firehose", "relayHost", fc.RelayHost, "cursor", fc.Cursor) 65 41 con, _, err := dialer.Dial(urlString, http.Header{}) 66 42 if err != nil { 67 43 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 68 44 } 69 45 70 - var eventCount atomic.Uint64 71 - 72 - rsc := &events.RepoStreamCallbacks{ 73 - RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 74 - if fc.Filter.Contains(evt.Repo) { 75 - err := fc.handleCommitEvent(ctx, evt) 76 - if err != nil { 77 - return err 78 - } 79 - } 80 - 81 - if eventCount.Add(1)%uint64(fc.PersistCursorEvery) == 0 { 82 - if err := fc.persistCursor(ctx, evt.Seq); err != nil { 83 - fc.Logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 84 - } 85 - } 86 - return nil 87 - }, 88 - } 89 - 90 46 scheduler := parallel.NewScheduler( 91 47 fc.Parallelism, 92 48 100, 93 49 fc.RelayHost, 94 - rsc.EventHandler, 50 + fc.Callbacks.EventHandler, 95 51 ) 96 52 return events.HandleRepoStream(ctx, con, scheduler, nil) 97 53 } 98 - 99 - func (fc *FirehoseConsumer) readLastCursor(ctx context.Context) (int64, error) { 100 - var cursor models.Cursor 101 - if err := fc.DB.Where("host = ?", fc.RelayHost).First(&cursor).Error; err != nil { 102 - if err == gorm.ErrRecordNotFound { 103 - fc.Logger.Info("no pre-existing cursor in database", "relayHost", fc.RelayHost) 104 - return 0, nil 105 - } 106 - return 0, err 107 - } 108 - return cursor.Cursor, nil 109 - } 110 - 111 - func (fc *FirehoseConsumer) persistCursor(ctx context.Context, seq int64) error { 112 - if seq <= 0 { 113 - return nil 114 - } 115 - 116 - cursor := models.Cursor{ 117 - Host: fc.RelayHost, 118 - Cursor: seq, 119 - } 120 - 121 - return fc.DB.Save(&cursor).Error 122 - } 123 - 124 - func (fc *FirehoseConsumer) getRepoState(did string) (models.RepoState, error) { 125 - var d models.Did 126 - if err := fc.DB.First(&d, "did = ?", did).Error; err != nil { 127 - return "", err 128 - } 129 - return d.State, nil 130 - } 131 - 132 - func (fc *FirehoseConsumer) updateRepoState(did string, state models.RepoState, rev string, errorMsg string) error { 133 - return fc.DB.Model(&models.Did{}). 134 - Where("did = ?", did). 135 - Updates(map[string]interface{}{ 136 - "state": state, 137 - "rev": rev, 138 - "error_msg": errorMsg, 139 - }).Error 140 - } 141 - 142 - func (fc *FirehoseConsumer) bufferCommitEvent(evt *comatproto.SyncSubscribeRepos_Commit) error { 143 - for _, op := range evt.Ops { 144 - bufferedEvt := models.BufferedEvt{ 145 - Did: evt.Repo, 146 - Collection: op.Path[:len(op.Path)-len(op.Path[len(op.Path)-1:])], // extract collection from path 147 - Rkey: op.Path[len(op.Path)-1:], // extract rkey from path 148 - Action: op.Action, 149 - Cid: op.Cid.String(), 150 - } 151 - if err := fc.DB.Create(&bufferedEvt).Error; err != nil { 152 - fc.Logger.Error("failed to buffer event", "did", evt.Repo, "path", op.Path, "error", err) 153 - } 154 - } 155 - return nil 156 - } 157 - 158 - func (fc *FirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 159 - state, err := fc.getRepoState(evt.Repo) 160 - if err != nil { 161 - fc.Logger.Error("failed to get repo state", "did", evt.Repo, "error", err) 162 - return nil 163 - } 164 - 165 - if state == models.RepoStatePending { 166 - return nil 167 - } else if state == models.RepoStateBackfilling { 168 - return fc.bufferCommitEvent(evt) 169 - } 170 - 171 - r, err := repo.VerifyCommitMessage(ctx, evt) 172 - if err != nil { 173 - fc.Logger.Info("failed to verify commit", "did", evt.Repo, "error", err) 174 - return err 175 - } 176 - 177 - for _, op := range evt.Ops { 178 - coll, rkey, err := syntax.ParseRepoPath(op.Path) 179 - if err != nil { 180 - return err 181 - } 182 - 183 - collStr := coll.String() 184 - rkeyStr := rkey.String() 185 - 186 - if op.Action == "delete" { 187 - outOp := &Op{ 188 - Did: evt.Repo, 189 - Collection: collStr, 190 - Rkey: rkeyStr, 191 - Action: "delete", 192 - } 193 - 194 - if err := fc.OnEvent(ctx, outOp); err != nil { 195 - return err 196 - } 197 - 198 - if err := fc.DB.Where("did = ? AND collection = ? AND rkey = ?", evt.Repo, collStr, rkeyStr).Delete(&models.RepoRecord{}).Error; err != nil { 199 - fc.Logger.Error("failed to delete repo record", "did", evt.Repo, "path", op.Path, "error", err) 200 - } 201 - continue 202 - } 203 - 204 - recBytes, recCid, err := r.GetRecordBytes(ctx, coll, rkey) 205 - if err != nil { 206 - return err 207 - } 208 - cidStr := recCid.String() 209 - 210 - rec, err := data.UnmarshalCBOR(recBytes) 211 - if err != nil { 212 - return err 213 - } 214 - 215 - outOp := &Op{ 216 - Did: evt.Repo, 217 - Collection: collStr, 218 - Rkey: rkeyStr, 219 - Action: op.Action, 220 - Record: rec, 221 - Cid: cidStr, 222 - } 223 - 224 - if err := fc.OnEvent(ctx, outOp); err != nil { 225 - return err 226 - } 227 - 228 - repoRecord := models.RepoRecord{ 229 - Did: evt.Repo, 230 - Collection: collStr, 231 - Rkey: rkeyStr, 232 - Cid: cidStr, 233 - Rev: evt.Rev, 234 - } 235 - if err := fc.DB.Save(&repoRecord).Error; err != nil { 236 - fc.Logger.Error("failed to save repo record", "did", evt.Repo, "path", op.Path, "error", err) 237 - } 238 - } 239 - 240 - if err := fc.updateRepoState(evt.Repo, models.RepoStateActive, evt.Rev, ""); err != nil { 241 - fc.Logger.Error("failed to update rev", "did", evt.Repo, "error", err) 242 - } 243 - 244 - return nil 245 - }
+10 -27
nexus/models/models.go
··· 1 1 package models 2 2 3 - import "time" 4 - 5 3 type RepoState string 6 4 7 5 const ( ··· 12 10 ) 13 11 14 12 type Did struct { 15 - Did string `gorm:"primaryKey"` 16 - State RepoState `gorm:"not null;default:'pending'"` 17 - Rev string `gorm:"type:text"` 18 - ErrorMsg string `gorm:"type:text"` 19 - CreatedAt time.Time `gorm:"not null"` 20 - UpdatedAt time.Time `gorm:"not null"` 13 + Did string `gorm:"primaryKey"` 14 + State RepoState `gorm:"not null;default:'pending'"` 15 + Rev string `gorm:"type:text"` 16 + ErrorMsg string `gorm:"type:text"` 21 17 } 22 18 23 - type BufferedEvt struct { 24 - ID uint `gorm:"primaryKey"` 25 - Did string `gorm:"not null;index"` 26 - Collection string `gorm:"not null"` 27 - Rkey string `gorm:"not null"` 28 - Action string `gorm:"not null"` 29 - Cid string `gorm:"type:text"` 30 - Record string `gorm:"type:text"` 19 + type OutboxBuffer struct { 20 + ID uint `gorm:"primaryKey"` 21 + Data string `gorm:"type:text;not null"` // JSON-encoded operations 31 22 } 32 23 33 24 type BackfillBuffer struct { 34 - ID uint `gorm:"primaryKey"` 35 - Did string `gorm:"not null;index"` 36 - Collection string `gorm:"not null"` 37 - Rkey string `gorm:"not null"` 38 - Action string `gorm:"not null"` 39 - Cid string `gorm:"type:text"` 40 - Record string `gorm:"type:text"` 41 - Rev string `gorm:"not null"` 42 - CreatedAt time.Time `gorm:"not null"` 25 + ID uint `gorm:"primaryKey"` 26 + Did string `gorm:"not null;index"` 27 + Data string `gorm:"type:text;not null"` // JSON-encoded Commit 43 28 } 44 29 45 30 type RepoRecord struct { ··· 47 32 Collection string `gorm:"primaryKey"` 48 33 Rkey string `gorm:"primaryKey"` 49 34 Cid string `gorm:"not null"` 50 - Rev string `gorm:"not null"` 51 - UpdatedAt time.Time 52 35 } 53 36 54 37 type Cursor struct {
+41 -28
nexus/nexus.go
··· 5 5 "log/slog" 6 6 "time" 7 7 8 + comatproto "github.com/bluesky-social/indigo/api/atproto" 8 9 "github.com/bluesky-social/indigo/atproto/identity" 10 + "github.com/bluesky-social/indigo/events" 9 11 "github.com/bluesky-social/indigo/nexus/models" 10 12 "github.com/labstack/echo/v4" 11 13 "gorm.io/driver/sqlite" 12 14 "gorm.io/gorm" 15 + "gorm.io/gorm/logger" 13 16 ) 14 17 15 18 type Nexus struct { ··· 24 27 backfillQueue *BackfillQueue 25 28 26 29 FirehoseConsumer *FirehoseConsumer 30 + EventProcessor *EventProcessor 27 31 } 28 32 29 33 type NexusConfig struct { ··· 34 38 } 35 39 36 40 func NewNexus(config NexusConfig) (*Nexus, error) { 37 - db, err := gorm.Open(sqlite.Open(config.DBPath), &gorm.Config{}) 41 + db, err := gorm.Open(sqlite.Open(config.DBPath), &gorm.Config{ 42 + Logger: logger.Default.LogMode(logger.Silent), 43 + }) 38 44 if err != nil { 39 45 return nil, err 40 46 } 41 47 42 - if err := db.AutoMigrate(&models.BufferedEvt{}, &models.Did{}, &models.RepoRecord{}, &models.BackfillBuffer{}, &models.Cursor{}); err != nil { 48 + if err := db.AutoMigrate(&models.Did{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.BackfillBuffer{}, &models.Cursor{}); err != nil { 43 49 return nil, err 44 50 } 45 51 ··· 75 81 persistCursorEvery = 100 76 82 } 77 83 78 - n.FirehoseConsumer = &FirehoseConsumer{ 79 - RelayHost: config.RelayHost, 80 - Filter: n.filter, 81 - Logger: n.logger.With("component", "firehose"), 84 + cursor, err := n.readLastCursor(context.Background(), config.RelayHost) 85 + if err != nil { 86 + return nil, err 87 + } 88 + 89 + n.EventProcessor = &EventProcessor{ 90 + Logger: n.logger.With("component", "processor"), 82 91 DB: db, 83 - Parallelism: parallelism, 92 + Dir: n.Dir, 84 93 PersistCursorEvery: persistCursorEvery, 85 - OnEvent: n.handleEvent, 94 + RelayHost: config.RelayHost, 95 + Outbox: n.outbox, 96 + } 97 + 98 + rsc := &events.RepoStreamCallbacks{ 99 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 100 + return n.EventProcessor.ProcessCommit(context.Background(), evt) 101 + }, 102 + } 103 + 104 + n.FirehoseConsumer = &FirehoseConsumer{ 105 + RelayHost: config.RelayHost, 106 + Cursor: cursor, 107 + Logger: n.logger.With("component", "firehose"), 108 + Parallelism: parallelism, 109 + Callbacks: rsc, 86 110 } 87 111 88 - // run 50 backfill workers 89 112 for i := 0; i < 50; i++ { 90 113 go n.runBackfillWorker(context.Background(), i) 91 114 } ··· 162 185 n.logger.Info("queued backfill", "did", did, "queue_depth", depth) 163 186 } 164 187 165 - func (n *Nexus) GetRepoState(did string) (models.RepoState, error) { 166 - var d models.Did 167 - if err := n.db.First(&d, "did = ?", did).Error; err != nil { 168 - return "", err 188 + func (n *Nexus) readLastCursor(ctx context.Context, relayHost string) (int64, error) { 189 + var cursor models.Cursor 190 + if err := n.db.Where("host = ?", relayHost).First(&cursor).Error; err != nil { 191 + if err == gorm.ErrRecordNotFound { 192 + n.logger.Info("no pre-existing cursor in database", "relayHost", relayHost) 193 + return 0, nil 194 + } 195 + return 0, err 169 196 } 170 - return d.State, nil 171 - } 172 - 173 - func (n *Nexus) UpdateRepoState(did string, state models.RepoState, rev string, errorMsg string) error { 174 - return n.db.Model(&models.Did{}). 175 - Where("did = ?", did). 176 - Updates(map[string]interface{}{ 177 - "state": state, 178 - "rev": rev, 179 - "error_msg": errorMsg, 180 - }).Error 181 - } 182 - 183 - func (n *Nexus) handleEvent(ctx context.Context, op *Op) error { 184 - return n.outbox.Send(op) 197 + return cursor.Cursor, nil 185 198 }
+13 -34
nexus/outbox.go
··· 23 23 } 24 24 } 25 25 26 - func (o *Outbox) Subscribe(ctx context.Context, send func(*Op) error) error { 27 - var bufferedEvts []models.BufferedEvt 26 + func (o *Outbox) Subscribe(ctx context.Context, send func(op *Op) error) error { 27 + var bufferedEvts []models.OutboxBuffer 28 28 if err := o.db.Order("id ASC").Find(&bufferedEvts).Error; err != nil { 29 29 o.logger.Error("failed to load buffered events", "error", err) 30 30 return err ··· 33 33 if len(bufferedEvts) > 0 { 34 34 o.logger.Info("draining buffered events", "count", len(bufferedEvts)) 35 35 for _, evt := range bufferedEvts { 36 - op := &Op{ 37 - Did: evt.Did, 38 - Collection: evt.Collection, 39 - Rkey: evt.Rkey, 40 - Action: evt.Action, 41 - Cid: evt.Cid, 36 + // Unmarshal buffered JSON back to Commit 37 + var op Op 38 + if err := json.Unmarshal([]byte(evt.Data), &op); err != nil { 39 + o.logger.Error("failed to unmarshal buffered op", "error", err, "id", evt.ID) 40 + continue 42 41 } 43 42 44 - if evt.Record != "" { 45 - var record map[string]interface{} 46 - if err := json.Unmarshal([]byte(evt.Record), &record); err != nil { 47 - o.logger.Error("failed to unmarshal record", "error", err, "id", evt.ID) 48 - continue 49 - } 50 - op.Record = record 51 - } 52 - 53 - if err := send(op); err != nil { 43 + if err := send(&op); err != nil { 54 44 o.logger.Info("send error during drain", "error", err) 55 45 return err 56 46 } ··· 87 77 } 88 78 89 79 func (o *Outbox) bufferToDB(op *Op) error { 90 - var recordJSON string 91 - if op.Record != nil { 92 - bytes, err := json.Marshal(op.Record) 93 - if err != nil { 94 - return err 95 - } 96 - recordJSON = string(bytes) 80 + jsonData, err := json.Marshal(op) 81 + if err != nil { 82 + return err 97 83 } 98 84 99 - err := o.db.Create(&models.BufferedEvt{ 100 - Did: op.Did, 101 - Collection: op.Collection, 102 - Rkey: op.Rkey, 103 - Action: op.Action, 104 - Cid: op.Cid, 105 - Record: recordJSON, 85 + return o.db.Create(&models.OutboxBuffer{ 86 + Data: string(jsonData), 106 87 }).Error 107 - 108 - return err 109 88 }
+275
nexus/processor.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "log/slog" 8 + "sync/atomic" 9 + 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/atproto/data" 12 + "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/atproto/repo" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/nexus/models" 16 + "gorm.io/gorm" 17 + ) 18 + 19 + type Commit struct { 20 + Did string `json:"did"` 21 + Rev string `json:"rev"` 22 + Ops []CommitOp `json:"ops"` 23 + } 24 + 25 + type CommitOp struct { 26 + Collection string `json:"collection"` 27 + Rkey string `json:"rkey"` 28 + Action string `json:"action"` 29 + Record map[string]interface{} `json:"record,omitempty"` 30 + Cid string `json:"cid,omitempty"` 31 + } 32 + 33 + func (c *Commit) ToOps() []*Op { 34 + var ops []*Op 35 + for _, op := range c.Ops { 36 + ops = append(ops, &Op{ 37 + Did: c.Did, 38 + Rev: c.Rev, 39 + Collection: op.Collection, 40 + Rkey: op.Rkey, 41 + Action: op.Action, 42 + Record: op.Record, 43 + Cid: op.Cid, 44 + }) 45 + } 46 + return ops 47 + } 48 + 49 + type Op struct { 50 + Did string `json:"did"` 51 + Rev string `json:"rev"` 52 + Collection string `json:"collection"` 53 + Rkey string `json:"rkey"` 54 + Action string `json:"action"` 55 + Record map[string]interface{} `json:"record,omitempty"` 56 + Cid string `json:"cid,omitempty"` 57 + } 58 + 59 + type EventProcessor struct { 60 + Logger *slog.Logger 61 + DB *gorm.DB 62 + Dir identity.Directory 63 + PersistCursorEvery int 64 + RelayHost string 65 + Outbox *Outbox 66 + 67 + eventCount uint64 68 + } 69 + 70 + func (ep *EventProcessor) ProcessCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 71 + // @TODO this should happen at end of processing 72 + // Persist cursor periodically 73 + count := atomic.AddUint64(&ep.eventCount, 1) 74 + if count%uint64(ep.PersistCursorEvery) == 0 { 75 + if err := ep.persistCursor(ep.RelayHost, evt.Seq); err != nil { 76 + ep.Logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 77 + } 78 + } 79 + 80 + var d models.Did 81 + if err := ep.DB.First(&d, "did = ?", evt.Repo).Error; err != nil { 82 + if err != gorm.ErrRecordNotFound { 83 + ep.Logger.Error("failed to get repo state", "did", evt.Repo, "error", err) 84 + } 85 + return nil 86 + } 87 + 88 + if d.State == models.RepoStatePending { 89 + return nil 90 + } 91 + 92 + if d.Rev != "" && evt.Rev <= d.Rev { 93 + ep.Logger.Debug("skipping replayed event", "did", evt.Repo, "eventRev", evt.Rev, "currentRev", d.Rev) 94 + return nil 95 + } 96 + 97 + commit, err := ep.validateCommit(ctx, evt, &d) 98 + if err != nil { 99 + ep.Logger.Error("failed to parse operations", "did", evt.Repo, "error", err) 100 + return err 101 + } 102 + 103 + if d.State == models.RepoStateBackfilling { 104 + if err := ep.addToBackfillBuffer(commit); err != nil { 105 + ep.Logger.Error("failed to buffer commit", "did", evt.Repo, "error", err) 106 + return err 107 + } 108 + } 109 + 110 + for _, op := range commit.ToOps() { 111 + if err := ep.Outbox.Send(op); err != nil { 112 + ep.Logger.Error("failed to send to outbox", "did", commit.Did, "rev", commit.Rev, "error", err) 113 + return err 114 + } 115 + } 116 + 117 + if err := ep.updateRepoState(commit); err != nil { 118 + ep.Logger.Error("failed to update repo state", "did", commit.Did, "rev", commit.Rev, "error", err) 119 + return err 120 + } 121 + 122 + return nil 123 + } 124 + 125 + func (ep *EventProcessor) validateCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, did *models.Did) (*Commit, error) { 126 + if err := repo.VerifyCommitSignature(ctx, ep.Dir, evt); err != nil { 127 + return nil, err 128 + } 129 + 130 + r, err := repo.VerifyCommitMessage(ctx, evt) 131 + if err != nil { 132 + return nil, err 133 + } 134 + 135 + var parsedOps []CommitOp 136 + 137 + for _, op := range evt.Ops { 138 + collection, rkey, err := syntax.ParseRepoPath(op.Path) 139 + if err != nil { 140 + return nil, fmt.Errorf("invalid record path: %w", err) 141 + } 142 + 143 + parsed := CommitOp{ 144 + Collection: collection.String(), 145 + Rkey: rkey.String(), 146 + Action: op.Action, 147 + } 148 + 149 + if op.Action == "create" || op.Action == "update" { 150 + if op.Cid == nil { 151 + return nil, fmt.Errorf("missing CID for create/update: %s", op.Path) 152 + } 153 + parsed.Cid = op.Cid.String() 154 + 155 + recBytes, _, err := r.GetRecordBytes(ctx, collection, rkey) 156 + if err != nil { 157 + ep.Logger.Error("failed to get record bytes", "did", evt.Repo, "path", op.Path, "error", err) 158 + continue 159 + } 160 + 161 + record, err := data.UnmarshalCBOR(recBytes) 162 + if err != nil { 163 + ep.Logger.Error("failed to unmarshal record", "did", evt.Repo, "path", op.Path, "error", err) 164 + continue 165 + } 166 + parsed.Record = record 167 + } 168 + 169 + parsedOps = append(parsedOps, parsed) 170 + } 171 + 172 + commit := &Commit{ 173 + Did: evt.Repo, 174 + Ops: parsedOps, 175 + } 176 + 177 + return commit, nil 178 + } 179 + 180 + func (ep *EventProcessor) updateRepoState(commit *Commit) error { 181 + return ep.DB.Transaction(func(tx *gorm.DB) error { 182 + if err := tx.Model(&models.Did{}). 183 + Where("did = ?", commit.Did). 184 + Updates(map[string]interface{}{ 185 + "rev": commit.Rev, 186 + }).Error; err != nil { 187 + return err 188 + } 189 + 190 + for _, op := range commit.Ops { 191 + if op.Action == "delete" { 192 + if err := tx.Delete(&models.RepoRecord{}, "did = ? AND collection = ? AND rkey = ?", commit.Did, op.Collection, op.Rkey).Error; err != nil { 193 + return err 194 + } 195 + } else { 196 + repoRecord := models.RepoRecord{ 197 + Did: commit.Did, 198 + Collection: op.Collection, 199 + Rkey: op.Rkey, 200 + Cid: op.Cid, 201 + } 202 + if err := tx.Save(&repoRecord).Error; err != nil { 203 + return err 204 + } 205 + } 206 + } 207 + 208 + return nil 209 + }) 210 + } 211 + 212 + func (ep *EventProcessor) addToBackfillBuffer(commit *Commit) error { 213 + jsonData, err := json.Marshal(commit) 214 + if err != nil { 215 + return err 216 + } 217 + return ep.DB.Create(&models.BackfillBuffer{ 218 + Did: commit.Did, 219 + Data: string(jsonData), 220 + }).Error 221 + } 222 + 223 + func (ep *EventProcessor) drainBackfillBuffer(ctx context.Context, did string) error { 224 + var bufferedEvts []models.BackfillBuffer 225 + if err := ep.DB.Where("did = ?", did).Order("id ASC").Find(&bufferedEvts).Error; err != nil { 226 + return fmt.Errorf("failed to load buffered events: %w", err) 227 + } 228 + 229 + if len(bufferedEvts) == 0 { 230 + return nil 231 + } 232 + 233 + ep.Logger.Info("processing buffered backfill events", "did", did, "count", len(bufferedEvts)) 234 + 235 + for _, evt := range bufferedEvts { 236 + var commit *Commit 237 + err := json.Unmarshal([]byte(evt.Data), commit) 238 + if err != nil { 239 + return fmt.Errorf("failed to unmarshal buffered event: %w", err) 240 + } 241 + 242 + for _, op := range commit.ToOps() { 243 + if err := ep.Outbox.Send(op); err != nil { 244 + ep.Logger.Error("failed to send to outbox", "did", commit.Did, "rev", commit.Rev, "error", err) 245 + return err 246 + } 247 + } 248 + 249 + if err := ep.updateRepoState(commit); err != nil { 250 + ep.Logger.Error("failed to update repo state", "did", commit.Did, "rev", commit.Rev, "error", err) 251 + return err 252 + } 253 + 254 + if err := ep.DB.Delete(&models.BackfillBuffer{}, "id = ?", evt.ID).Error; err != nil { 255 + ep.Logger.Error("failed to delete buffered event", "id", evt.ID, "did", commit.Did, "rev", commit.Rev, "error", err) 256 + return err 257 + } 258 + } 259 + 260 + ep.Logger.Info("processed buffered backfill events", "did", did, "count", len(bufferedEvts)) 261 + return nil 262 + } 263 + 264 + func (ep *EventProcessor) persistCursor(relayHost string, seq int64) error { 265 + if seq <= 0 { 266 + return nil 267 + } 268 + 269 + cursor := models.Cursor{ 270 + Host: relayHost, 271 + Cursor: seq, 272 + } 273 + 274 + return ep.DB.Save(&cursor).Error 275 + }