this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix some db locking

dholms 20beec47 86e2112b

+71 -75
+2 -2
nexus/nexus.go
··· 49 49 } 50 50 51 51 func NewNexus(config NexusConfig) (*Nexus, error) { 52 - db, err := gorm.Open(sqlite.Open(config.DBPath+"?_journal_mode=WAL"), &gorm.Config{ 52 + db, err := gorm.Open(sqlite.Open(config.DBPath+"?_journal_mode=WAL&_busy_timeout=10000"), &gorm.Config{ 53 53 Logger: logger.Default.LogMode(logger.Silent), 54 54 }) 55 55 if err != nil { ··· 150 150 }() 151 151 } 152 152 153 - for i := 0; i < 50; i++ { 153 + for i := 0; i < 20; i++ { 154 154 go n.runResyncWorker(context.Background(), i) 155 155 } 156 156
-2
nexus/outbox.go
··· 147 147 } 148 148 } 149 149 o.cacheMu.Unlock() 150 - 151 - o.logger.Info("loaded events into cache", "count", len(events), "cacheSize", len(o.eventCache)) 152 150 } 153 151 154 152 // runDelivery continuously pulls from pendingIDs and delivers events
+43 -55
nexus/processor.go
··· 15 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 16 "github.com/bluesky-social/indigo/nexus/models" 17 17 "gorm.io/gorm" 18 + "gorm.io/gorm/clause" 18 19 ) 19 20 20 21 type EventProcessor struct { ··· 42 43 ep.Logger.Error("failed to auto-track repo", "did", evt.Repo, "error", err) 43 44 return err 44 45 } 45 - ep.Logger.Info("auto-tracked new repo from firehose", "did", evt.Repo) 46 46 return nil 47 47 } 48 48 return nil ··· 95 95 } 96 96 97 97 if err := ep.DB.Transaction(func(tx *gorm.DB) error { 98 - if err := updateRepoState(tx, commit); err != nil { 99 - return err 100 - } 101 - 102 - for _, recEvt := range commit.ToEvts() { 103 - if err := persistRecordEvt(tx, recEvt); err != nil { 104 - return err 105 - } 106 - } 107 - 108 - return nil 98 + return applyCommit(tx, commit) 109 99 }); err != nil { 110 100 ep.Logger.Error("failed to update repo state", "did", commit.Did, "rev", commit.Rev, "error", err) 111 101 return err ··· 188 178 ep.Logger.Error("failed to auto-track repo", "did", evt.Did, "error", err) 189 179 return err 190 180 } 191 - ep.Logger.Info("auto-tracked new repo from firehose", "did", evt.Did) 192 181 return nil 193 182 } 194 183 return nil ··· 236 225 ep.Logger.Error("failed to auto-track repo", "did", did, "error", err) 237 226 return err 238 227 } 239 - ep.Logger.Info("auto-tracked new repo from firehose", "did", did) 240 228 return nil 241 229 } 242 230 return nil ··· 291 279 ep.Logger.Error("failed to auto-track repo", "did", evt.Did, "error", err) 292 280 return err 293 281 } 294 - ep.Logger.Info("auto-tracked new repo from firehose", "did", evt.Did) 295 282 return nil 296 283 } 297 284 return nil ··· 375 362 } 376 363 377 364 if err := ep.DB.Transaction(func(tx *gorm.DB) error { 378 - if err := updateRepoState(tx, &commit); err != nil { 365 + if err := applyCommit(tx, &commit); err != nil { 379 366 return err 380 367 } 381 - 382 - for _, recEvt := range commit.ToEvts() { 383 - if err := persistRecordEvt(tx, recEvt); err != nil { 384 - return err 385 - } 386 - } 387 - 388 - if err := tx.Delete(&models.ResyncBuffer{}, "id = ?", evt.ID).Error; err != nil { 389 - return err 390 - } 391 - 392 - return nil 368 + return tx.Delete(&models.ResyncBuffer{}, "id = ?", evt.ID).Error 393 369 }); err != nil { 394 370 ep.Logger.Error("failed to process buffered commit", "did", commit.Did, "rev", commit.Rev, "error", err) 395 371 return err ··· 401 377 return nil 402 378 } 403 379 404 - func updateRepoState(tx *gorm.DB, commit *Commit) error { 380 + func applyCommit(tx *gorm.DB, commit *Commit) error { 405 381 if err := tx.Model(&models.Repo{}). 406 382 Where("did = ?", commit.Did). 407 383 Updates(map[string]interface{}{ ··· 410 386 }).Error; err != nil { 411 387 return err 412 388 } 389 + 390 + var toPut []*models.RepoRecord 391 + var outboxBatch []*models.OutboxBuffer 413 392 414 393 for _, op := range commit.Ops { 415 394 if op.Action == "delete" { 416 - if err := tx.Delete(&models.RepoRecord{}, "did = ? AND collection = ? AND rkey = ?", commit.Did, op.Collection, op.Rkey).Error; err != nil { 395 + if err := tx.Delete(&models.RepoRecord{}, "did = ? AND collection = ? AND rkey = ?", 396 + commit.Did, op.Collection, op.Rkey).Error; err != nil { 417 397 return err 418 398 } 419 399 } else { 420 - repoRecord := models.RepoRecord{ 400 + toPut = append(toPut, &models.RepoRecord{ 421 401 Did: commit.Did, 422 402 Collection: op.Collection, 423 403 Rkey: op.Rkey, 424 404 Cid: op.Cid, 425 - } 426 - if err := tx.Save(&repoRecord).Error; err != nil { 427 - return err 428 - } 405 + }) 406 + } 407 + } 408 + 409 + if len(toPut) > 0 { 410 + if err := tx.Clauses(clause.OnConflict{ 411 + Columns: []clause.Column{{Name: "did"}, {Name: "collection"}, {Name: "rkey"}}, 412 + DoUpdates: clause.AssignmentColumns([]string{"cid"}), 413 + }).CreateInBatches(toPut, 100).Error; err != nil { 414 + return err 415 + } 416 + } 417 + 418 + for _, recEvt := range commit.ToEvts() { 419 + jsonData, err := json.Marshal(&OutboxEvt{ 420 + Type: "record", 421 + RecordEvt: recEvt, 422 + }) 423 + if err != nil { 424 + return err 425 + } 426 + outboxBatch = append(outboxBatch, &models.OutboxBuffer{ 427 + Live: recEvt.Live, 428 + Data: string(jsonData), 429 + }) 430 + } 431 + 432 + if len(outboxBatch) > 0 { 433 + if err := tx.CreateInBatches(outboxBatch, 100).Error; err != nil { 434 + return err 429 435 } 430 436 } 431 437 ··· 436 442 if err := tx.Delete(&models.RepoRecord{}, "did = ?", did).Error; err != nil { 437 443 return err 438 444 } 439 - 440 445 if err := tx.Delete(&models.ResyncBuffer{}, "did = ?", did).Error; err != nil { 441 446 return err 442 447 } 443 - 444 - if err := tx.Delete(&models.Repo{}, "did = ?", did).Error; err != nil { 445 - return err 446 - } 447 - 448 - return nil 449 - } 450 - 451 - func persistRecordEvt(tx *gorm.DB, evt *RecordEvt) error { 452 - return persistOutboxEvt(tx, &OutboxEvt{ 453 - Type: "record", 454 - RecordEvt: evt, 455 - }) 448 + return tx.Delete(&models.Repo{}, "did = ?", did).Error 456 449 } 457 450 458 451 func persistUserEvt(tx *gorm.DB, evt *UserEvt) error { 459 - return persistOutboxEvt(tx, &OutboxEvt{ 452 + jsonData, err := json.Marshal(&OutboxEvt{ 460 453 Type: "user", 461 454 UserEvt: evt, 462 455 }) 463 - } 464 - 465 - func persistOutboxEvt(tx *gorm.DB, evt *OutboxEvt) error { 466 - jsonData, err := json.Marshal(evt) 467 456 if err != nil { 468 457 return err 469 458 } 470 - 471 459 return tx.Create(&models.OutboxBuffer{ 472 460 Data: string(jsonData), 473 461 }).Error
+26 -16
nexus/resync.go
··· 3 3 import ( 4 4 "bytes" 5 5 "context" 6 + "encoding/json" 6 7 "fmt" 7 8 "net/http" 8 9 "time" ··· 15 16 "github.com/bluesky-social/indigo/xrpc" 16 17 "github.com/ipfs/go-cid" 17 18 "gorm.io/gorm" 19 + "gorm.io/gorm/clause" 18 20 ) 19 21 20 22 func (n *Nexus) runResyncWorker(ctx context.Context, workerID int) { ··· 239 241 return nil 240 242 } 241 243 242 - recordBatch := make([]*models.RepoRecord, 0, len(evtBatch)) 244 + var recordBatch []*models.RepoRecord 245 + var outboxBatch []*models.OutboxBuffer 246 + 243 247 for _, evt := range evtBatch { 244 248 recordBatch = append(recordBatch, &models.RepoRecord{ 245 249 Did: evt.Did, ··· 247 251 Rkey: evt.Rkey, 248 252 Cid: evt.Cid, 249 253 }) 250 - } 251 254 252 - if err := n.db.Transaction(func(tx *gorm.DB) error { 253 - for _, record := range recordBatch { 254 - if err := tx.Save(&record).Error; err != nil { 255 - return err 256 - } 255 + jsonData, err := json.Marshal(&OutboxEvt{ 256 + Type: "record", 257 + RecordEvt: evt, 258 + }) 259 + if err != nil { 260 + return err 257 261 } 262 + outboxBatch = append(outboxBatch, &models.OutboxBuffer{ 263 + Live: evt.Live, 264 + Data: string(jsonData), 265 + }) 266 + } 258 267 259 - for _, evt := range evtBatch { 260 - if err := persistRecordEvt(tx, evt); err != nil { 268 + return n.db.Transaction(func(tx *gorm.DB) error { 269 + if len(recordBatch) > 0 { 270 + if err := tx.Clauses(clause.OnConflict{ 271 + UpdateAll: true, 272 + }).CreateInBatches(recordBatch, 100).Error; err != nil { 261 273 return err 262 274 } 263 275 } 264 - 276 + if len(outboxBatch) > 0 { 277 + return tx.CreateInBatches(outboxBatch, 100).Error 278 + } 265 279 return nil 266 - }); err != nil { 267 - return err 268 - } 269 - 270 - return nil 280 + }) 271 281 } 272 282 273 283 func (n *Nexus) handleResyncError(did string, err error) error { ··· 295 305 "state": state, 296 306 "error_msg": errMsg, 297 307 "retry_count": repo.RetryCount + 1, 298 - "retry_after": retryAfter, 308 + "retry_after": retryAfter.Unix(), 299 309 }).Error 300 310 if dbErr != nil { 301 311 return dbErr