this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

tidy

dholms 9a1fe128 0c4350b5

+25 -23
+8 -6
nexus/firehose.go
··· 62 62 63 63 rsc := &events.RepoStreamCallbacks{ 64 64 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 65 + if fc.Filter.Contains(evt.Repo) { 66 + err := fc.OnCommit(ctx, evt) 67 + if err != nil { 68 + return err 69 + } 70 + } 71 + 65 72 if eventCount.Add(1)%uint64(fc.PersistCursorEvery) == 0 { 66 73 if err := fc.persistCursor(ctx, evt.Seq); err != nil { 67 74 fc.Logger.Error("failed to persist cursor", "seq", evt.Seq, "error", err) 68 75 } 69 76 } 70 - 71 - if !fc.Filter.Contains(evt.Repo) { 72 - return nil 73 - } 74 - 75 - return fc.OnCommit(ctx, evt) 77 + return nil 76 78 }, 77 79 } 78 80
+3 -3
nexus/handlers.go
··· 50 50 return err 51 51 } 52 52 53 - filterDids := make([]models.FilterDid, len(payload.DIDs)) 53 + dids := make([]models.Did, len(payload.DIDs)) 54 54 for i, did := range payload.DIDs { 55 - filterDids[i] = models.FilterDid{ 55 + dids[i] = models.Did{ 56 56 Did: did, 57 57 State: models.RepoStatePending, 58 58 } 59 59 } 60 60 61 - if err := n.db.Save(&filterDids).Error; err != nil { 61 + if err := n.db.Save(&dids).Error; err != nil { 62 62 n.logger.Error("failed to upsert dids", "error", err) 63 63 return echo.NewHTTPError(http.StatusInternalServerError) 64 64 }
+1 -1
nexus/models/models.go
··· 11 11 RepoStateError RepoState = "error" 12 12 ) 13 13 14 - type FilterDid struct { 14 + type Did struct { 15 15 Did string `gorm:"primaryKey"` 16 16 State RepoState `gorm:"not null;default:'pending'"` 17 17 Rev string `gorm:"type:text"`
+13 -13
nexus/nexus.go
··· 48 48 return nil, err 49 49 } 50 50 51 - if err := db.AutoMigrate(&models.BufferedEvt{}, &models.FilterDid{}, &models.RepoRecord{}, &models.BackfillBuffer{}, &models.Cursor{}); err != nil { 51 + if err := db.AutoMigrate(&models.BufferedEvt{}, &models.Did{}, &models.RepoRecord{}, &models.BackfillBuffer{}, &models.Cursor{}); err != nil { 52 52 return nil, err 53 53 } 54 54 ··· 135 135 } 136 136 137 137 func (n *Nexus) LoadFilters() error { 138 - var filterDids []models.FilterDid 139 - if err := n.db.Find(&filterDids).Error; err != nil { 138 + var dids []models.Did 139 + if err := n.db.Find(&dids).Error; err != nil { 140 140 return err 141 141 } 142 142 143 - dids := make([]string, 0, len(filterDids)) 144 - for _, f := range filterDids { 145 - dids = append(dids, f.Did) 143 + didStrings := make([]string, 0, len(dids)) 144 + for _, d := range dids { 145 + didStrings = append(didStrings, d.Did) 146 146 147 - if f.State == models.RepoStatePending || f.State == models.RepoStateBackfilling { 148 - n.queueBackfill(f.Did) 147 + if d.State == models.RepoStatePending || d.State == models.RepoStateBackfilling { 148 + n.queueBackfill(d.Did) 149 149 } 150 150 } 151 151 152 - n.filter.AddBatch(dids) 152 + n.filter.AddBatch(didStrings) 153 153 return nil 154 154 } 155 155 ··· 172 172 } 173 173 174 174 func (n *Nexus) GetRepoState(did string) (models.RepoState, error) { 175 - var filterDid models.FilterDid 176 - if err := n.db.First(&filterDid, "did = ?", did).Error; err != nil { 175 + var d models.Did 176 + if err := n.db.First(&d, "did = ?", did).Error; err != nil { 177 177 return "", err 178 178 } 179 - return filterDid.State, nil 179 + return d.State, nil 180 180 } 181 181 182 182 func (n *Nexus) UpdateRepoState(did string, state models.RepoState, rev string, errorMsg string) error { 183 - return n.db.Model(&models.FilterDid{}). 183 + return n.db.Model(&models.Did{}). 184 184 Where("did = ?", did). 185 185 Updates(map[string]interface{}{ 186 186 "state": state,