this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

do prev data checks

dholms 6f77b020 ebc228fb

+63 -44
+31 -29
nexus/backfill.go
··· 4 4 "bytes" 5 5 "context" 6 6 "fmt" 7 - "io" 8 7 "net/http" 9 8 "time" 10 9 11 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 11 "github.com/bluesky-social/indigo/atproto/data" 12 + "github.com/bluesky-social/indigo/atproto/repo" 13 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 14 "github.com/bluesky-social/indigo/nexus/models" 15 - "github.com/bluesky-social/indigo/repo" 16 15 "github.com/bluesky-social/indigo/xrpc" 17 16 "github.com/ipfs/go-cid" 18 17 "gorm.io/gorm" ··· 43 42 } 44 43 45 44 func (n *Nexus) claimBackfillJob(ctx context.Context) (string, bool, error) { 46 - var did models.Did 45 + var did models.Repo 47 46 err := n.db.Transaction(func(tx *gorm.DB) error { 48 47 if err := tx.Where("state = ?", models.RepoStatePending). 49 48 First(&did).Error; err != nil { 50 49 return err 51 50 } 52 51 53 - return tx.Model(&models.Did{}). 52 + return tx.Model(&models.Repo{}). 54 53 Where("did = ?", did.Did). 55 54 Update("state", models.RepoStateBackfilling).Error 56 55 }) ··· 68 67 func (n *Nexus) backfillDid(ctx context.Context, did string) error { 69 68 n.logger.Info("starting backfill", "did", did) 70 69 71 - rev, err := n.doBackfill(ctx, did) 70 + err := n.doBackfill(ctx, did) 72 71 if err != nil { 73 - n.db.Model(&models.Did{}). 72 + n.db.Model(&models.Repo{}). 74 73 Where("did = ?", did). 75 74 Updates(map[string]interface{}{ 76 75 "state": models.RepoStateError, 77 76 "rev": "", 77 + "prev_data": "", 78 78 "error_msg": err.Error(), 79 79 }) 80 80 return err 81 81 } 82 82 83 - if err := n.db.Model(&models.Did{}). 84 - Where("did = ?", did). 85 - Updates(map[string]interface{}{ 86 - "state": models.RepoStateActive, 87 - "rev": rev, 88 - "error_msg": "", 89 - }).Error; err != nil { 90 - return fmt.Errorf("failed to update state to active %w", err) 91 - } 92 - 93 83 if err := n.EventProcessor.drainBackfillBuffer(ctx, did); err != nil { 94 84 n.logger.Error("failed to drain backfill buffer events", "did", did, "error", err) 95 85 } ··· 97 87 return nil 98 88 } 99 89 100 - func (n *Nexus) doBackfill(ctx context.Context, did string) (string, error) { 90 + func (n *Nexus) doBackfill(ctx context.Context, did string) error { 101 91 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) 102 92 if err != nil { 103 - return "", fmt.Errorf("failed to resolve DID: %w", err) 93 + return fmt.Errorf("failed to resolve DID: %w", err) 104 94 } 105 95 106 96 pdsURL := ident.PDSEndpoint() 107 97 if pdsURL == "" { 108 - return "", fmt.Errorf("no PDS endpoint for DID: %s", did) 98 + return fmt.Errorf("no PDS endpoint for DID: %s", did) 109 99 } 110 100 111 101 n.logger.Info("fetching repo from PDS", "did", did, "pds", pdsURL) ··· 117 107 118 108 repoBytes, err := comatproto.SyncGetRepo(ctx, client, did, "") 119 109 if err != nil { 120 - return "", fmt.Errorf("failed to get repo: %w", err) 110 + return fmt.Errorf("failed to get repo: %w", err) 121 111 } 122 112 123 113 n.logger.Info("parsing repo CAR", "did", did, "size", len(repoBytes)) 124 114 125 - r, err := repo.ReadRepoFromCar(ctx, io.NopCloser(bytes.NewReader(repoBytes))) 115 + commit, r, err := repo.LoadRepoFromCAR(ctx, bytes.NewReader(repoBytes)) 126 116 if err != nil { 127 - return "", fmt.Errorf("failed to read repo from CAR: %w", err) 117 + return fmt.Errorf("failed to read repo from CAR: %w", err) 128 118 } 129 119 130 - rev := r.SignedCommit().Rev 120 + rev := commit.Rev 131 121 n.logger.Info("iterating repo records", "did", did, "rev", rev) 132 122 133 123 var existingRecords []models.RepoRecord 134 124 if err := n.db.Find(&existingRecords, "did = ?", did).Error; err != nil { 135 - return "", fmt.Errorf("failed to load existing records: %w", err) 125 + return fmt.Errorf("failed to load existing records: %w", err) 136 126 } 137 127 138 128 existingCids := make(map[string]string, len(existingRecords)) ··· 144 134 145 135 numRecords := 0 146 136 147 - err = r.ForEach(ctx, "", func(recPath string, recCid cid.Cid) error { 137 + err = r.MST.Walk(func(recPathBytes []byte, recCid cid.Cid) error { 138 + recPath := string(recPathBytes) 148 139 collection, rkey, err := syntax.ParseRepoPath(recPath) 149 140 if err != nil { 150 141 n.logger.Error("invalid record path", "path", recPath, "error", err) ··· 165 156 action = "update" 166 157 } 167 158 168 - blk, err := r.Blockstore().Get(ctx, recCid) 159 + blk, err := r.RecordStore.Get(ctx, recCid) 169 160 if err != nil { 170 161 n.logger.Error("failed to get block", "path", recPath, "error", err) 171 162 return nil ··· 205 196 }) 206 197 207 198 if err != nil { 208 - return "", fmt.Errorf("failed to iterate repo: %w", err) 199 + return fmt.Errorf("failed to iterate repo: %w", err) 200 + } 201 + 202 + if err := n.db.Model(&models.Repo{}). 203 + Where("did = ?", did). 204 + Updates(map[string]interface{}{ 205 + "state": models.RepoStateActive, 206 + "rev": rev, 207 + "prev_data": commit.Data.String(), 208 + "error_msg": "", 209 + }).Error; err != nil { 210 + return fmt.Errorf("failed to update repo state to active %w", err) 209 211 } 210 212 211 213 n.logger.Info("backfill repo complete", "did", did, "records", numRecords, "rev", rev) 212 - return rev, nil 214 + return nil 213 215 } 214 216 215 217 func (n *Nexus) resetBackfillingToPending() error { 216 - return n.db.Model(&models.Did{}). 218 + return n.db.Model(&models.Repo{}). 217 219 Where("state = ?", models.RepoStateBackfilling). 218 220 Update("state", models.RepoStatePending).Error 219 221 }
+2 -2
nexus/handlers.go
··· 50 50 return err 51 51 } 52 52 53 - dids := make([]models.Did, len(payload.DIDs)) 53 + dids := make([]models.Repo, len(payload.DIDs)) 54 54 for i, did := range payload.DIDs { 55 - dids[i] = models.Did{ 55 + dids[i] = models.Repo{ 56 56 Did: did, 57 57 State: models.RepoStatePending, 58 58 }
+1 -1
nexus/main.go
··· 12 12 func main() { 13 13 nexus, err := NewNexus(NexusConfig{ 14 14 DBPath: "./nexus.db", 15 - RelayHost: "https://bsky.network", 15 + RelayHost: "wss://relay1.us-east.bsky.network", 16 16 }) 17 17 if err != nil { 18 18 log.Fatal(err)
+2 -1
nexus/models/models.go
··· 9 9 RepoStateError RepoState = "error" 10 10 ) 11 11 12 - type Did struct { 12 + type Repo struct { 13 13 Did string `gorm:"primaryKey"` 14 14 State RepoState `gorm:"not null;default:'pending';index"` 15 15 Rev string `gorm:"type:text"` 16 + PrevData string `gorm:"type:text"` 16 17 ErrorMsg string `gorm:"type:text"` 17 18 } 18 19
+1 -1
nexus/nexus.go
··· 43 43 return nil, err 44 44 } 45 45 46 - if err := db.AutoMigrate(&models.Did{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.BackfillBuffer{}, &models.Cursor{}); err != nil { 46 + if err := db.AutoMigrate(&models.Repo{}, &models.RepoRecord{}, &models.OutboxBuffer{}, &models.BackfillBuffer{}, &models.Cursor{}); err != nil { 47 47 return nil, err 48 48 } 49 49
+22 -7
nexus/processor.go
··· 31 31 func (ep *EventProcessor) ProcessCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 32 32 defer ep.trackLastSeq(evt.Seq) 33 33 34 - var d models.Did 34 + var d models.Repo 35 35 if err := ep.DB.First(&d, "did = ?", evt.Repo).Error; err != nil { 36 36 if err != gorm.ErrRecordNotFound { 37 37 ep.Logger.Error("failed to get repo state", "did", evt.Repo, "error", err) ··· 48 48 return nil 49 49 } 50 50 51 - commit, err := ep.validateCommit(ctx, evt, &d) 51 + if evt.PrevData == nil { 52 + ep.Logger.Debug("legacy commit event, skipping prev data check", "did", evt.Repo, "rev", evt.Rev) 53 + } else if evt.PrevData.String() != d.PrevData { 54 + // @TODO DESYNCED 55 + ep.Logger.Warn("repo state desynced", "did", evt.Repo, "rev", evt.Rev) 56 + } 57 + 58 + commit, err := ep.validateCommit(ctx, evt) 52 59 if err != nil { 53 60 ep.Logger.Error("failed to parse operations", "did", evt.Repo, "error", err) 54 61 return err ··· 76 83 return nil 77 84 } 78 85 79 - func (ep *EventProcessor) validateCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, did *models.Did) (*Commit, error) { 86 + func (ep *EventProcessor) validateCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*Commit, error) { 80 87 if err := repo.VerifyCommitSignature(ctx, ep.Dir, evt); err != nil { 81 88 return nil, err 82 89 } ··· 123 130 parsedOps = append(parsedOps, parsed) 124 131 } 125 132 133 + repoCommit, err := r.Commit() 134 + if err != nil { 135 + return nil, err 136 + } 137 + 126 138 commit := &Commit{ 127 - Did: evt.Repo, 128 - Ops: parsedOps, 139 + Did: evt.Repo, 140 + Rev: repoCommit.Rev, 141 + DataCid: repoCommit.Data.String(), 142 + Ops: parsedOps, 129 143 } 130 144 131 145 return commit, nil ··· 133 147 134 148 func (ep *EventProcessor) updateRepoState(commit *Commit) error { 135 149 return ep.DB.Transaction(func(tx *gorm.DB) error { 136 - if err := tx.Model(&models.Did{}). 150 + if err := tx.Model(&models.Repo{}). 137 151 Where("did = ?", commit.Did). 138 152 Updates(map[string]interface{}{ 139 - "rev": commit.Rev, 153 + "rev": commit.Rev, 154 + "prev_data": commit.DataCid, 140 155 }).Error; err != nil { 141 156 return err 142 157 }
+4 -3
nexus/types.go
··· 1 1 package main 2 2 3 3 type Commit struct { 4 - Did string `json:"did"` 5 - Rev string `json:"rev"` 6 - Ops []CommitOp `json:"ops"` 4 + Did string `json:"did"` 5 + Rev string `json:"rev"` 6 + DataCid string `json:"data_cid"` 7 + Ops []CommitOp `json:"ops"` 7 8 } 8 9 9 10 type CommitOp struct {