this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

mutex for claiming jobs + batch repo record writes

dholms da526d34 8546ef7e

+73 -35
+3
nexus/nexus.go
··· 3 3 import ( 4 4 "context" 5 5 "log/slog" 6 + "sync" 6 7 "time" 7 8 8 9 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 29 30 30 31 FullNetworkMode bool 31 32 RelayHost string 33 + 34 + claimJobMu sync.Mutex 32 35 } 33 36 34 37 type NexusConfig struct {
+70 -35
nexus/resync.go
··· 42 42 } 43 43 44 44 func (n *Nexus) claimResyncJob(ctx context.Context) (string, bool, error) { 45 - var did models.Repo 46 - err := n.db.Transaction(func(tx *gorm.DB) error { 47 - if err := tx.Where("state IN (?)", []models.RepoState{models.RepoStatePending, models.RepoStateDesynced}). 48 - First(&did).Error; err != nil { 49 - return err 50 - } 45 + n.claimJobMu.Lock() 46 + defer n.claimJobMu.Unlock() 51 47 52 - return tx.Model(&models.Repo{}). 53 - Where("did = ?", did.Did). 54 - Update("state", models.RepoStateResyncing).Error 55 - }) 56 - 57 - if err != nil { 58 - if err == gorm.ErrRecordNotFound { 59 - return "", false, nil 60 - } 61 - return "", false, err 48 + var did string 49 + result := n.db.Raw(` 50 + UPDATE repos 51 + SET state = ? 52 + WHERE did = ( 53 + SELECT did FROM repos 54 + WHERE state IN (?, ?) 55 + ORDER BY RANDOM() 56 + LIMIT 1 57 + ) 58 + RETURNING did 59 + `, models.RepoStateResyncing, models.RepoStatePending, models.RepoStateDesynced).Scan(&did) 60 + if result.Error != nil { 61 + return "", false, result.Error 62 + } 63 + if result.RowsAffected == 0 { 64 + return "", false, nil 62 65 } 63 - 64 - return did.Did, true, nil 66 + return did, true, nil 65 67 } 66 68 67 69 func (n *Nexus) resyncDid(ctx context.Context, did string) error { ··· 91 93 92 94 return nil 93 95 } 96 + 97 + const BATCH_SIZE = 100 94 98 95 99 func (n *Nexus) doResync(ctx context.Context, did string) error { 96 100 ident, err := n.Dir.LookupDID(ctx, syntax.DID(did)) ··· 137 141 } 138 142 n.logger.Info("pre-loaded existing records", "did", did, "count", len(existingCids)) 139 143 140 - numRecords := 0 144 + var evtBatch []*RecordEvt 141 145 142 146 err = r.MST.Walk(func(recPathBytes []byte, recCid cid.Cid) error { 143 147 recPath := string(recPathBytes) ··· 181 185 Record: rec, 182 186 Cid: recCid.String(), 183 187 } 188 + evtBatch = append(evtBatch, evt) 184 189 185 - repoRecord := models.RepoRecord{ 186 - Did: did, 187 - Collection: collStr, 188 - Rkey: rkeyStr, 189 - Cid: cidStr, 190 - } 191 - 192 - if err := n.db.Transaction(func(tx *gorm.DB) error { 193 - if err := tx.Save(&repoRecord).Error; err != nil { 190 + if len(evtBatch) >= BATCH_SIZE { 191 + if err := n.writeBatch(evtBatch); err != nil { 192 + n.logger.Error("failed to flush batch", "error", err, "did", did) 194 193 return err 195 194 } 196 - return persistRecordEvt(tx, evt) 197 - }); err != nil { 198 - n.logger.Error("failed to save record and persist event", "error", err, "did", did, "path", recPath) 199 - return nil 200 195 } 196 + evtBatch = evtBatch[:0] 201 197 202 - n.outbox.Notify() 203 - numRecords++ 204 198 return nil 205 199 }) 206 200 207 201 if err != nil { 208 202 return fmt.Errorf("failed to iterate repo: %w", err) 203 + } 204 + 205 + if err := n.writeBatch(evtBatch); err != nil { 206 + return fmt.Errorf("failed to flush final batch: %w", err) 209 207 } 210 208 211 209 if err := n.db.Model(&models.Repo{}). ··· 219 217 return fmt.Errorf("failed to update repo state to active %w", err) 220 218 } 221 219 222 - n.logger.Info("resync repo complete", "did", did, "records", numRecords, "rev", rev) 220 + n.logger.Info("resync repo complete", "did", did, "rev", rev) 221 + return nil 222 + } 223 + 224 + func (n *Nexus) writeBatch(evtBatch []*RecordEvt) error { 225 + if len(evtBatch) == 0 { 226 + return nil 227 + } 228 + 229 + recordBatch := make([]*models.RepoRecord, 0, len(evtBatch)) 230 + for _, evt := range evtBatch { 231 + recordBatch = append(recordBatch, &models.RepoRecord{ 232 + Did: evt.Did, 233 + Collection: evt.Collection, 234 + Rkey: evt.Rkey, 235 + Cid: evt.Cid, 236 + }) 237 + } 238 + 239 + if err := n.db.Transaction(func(tx *gorm.DB) error { 240 + for _, record := range recordBatch { 241 + if err := tx.Save(&record).Error; err != nil { 242 + return err 243 + } 244 + } 245 + 246 + for _, evt := range evtBatch { 247 + if err := persistRecordEvt(tx, evt); err != nil { 248 + return err 249 + } 250 + } 251 + 252 + return nil 253 + }); err != nil { 254 + return err 255 + } 256 + 257 + n.outbox.Notify() 223 258 return nil 224 259 } 225 260