this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

persiste all messages to outbox buffer

dholms 7e8df665 a405f134

+182 -151
+10 -3
nexus/handlers.go
··· 35 35 36 36 n.logger.Info("websocket connected") 37 37 38 - return n.outbox.Subscribe(c.Request().Context(), func(evt *OutboxEvt) error { 39 - return ws.WriteJSON(evt) 40 - }) 38 + evtCh := n.outbox.Subscribe(c.Request().Context()) 39 + 40 + for evt := range evtCh { 41 + if err := ws.WriteJSON(evt); err != nil { 42 + n.logger.Info("websocket write error", "error", err) 43 + return err 44 + } 45 + } 46 + 47 + return nil 41 48 } 42 49 43 50 type DidPayload struct {
+1
nexus/nexus.go
··· 122 122 } 123 123 124 124 go n.EventProcessor.RunCursorSaver(context.Background(), cursorSaveInterval) 125 + go n.outbox.Run(context.Background()) 125 126 126 127 n.registerRoutes() 127 128
+40 -61
nexus/outbox.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "log/slog" 7 + "time" 7 8 8 9 "github.com/bluesky-social/indigo/nexus/models" 9 10 "gorm.io/gorm" ··· 11 12 12 13 type Outbox struct { 13 14 db *gorm.DB 14 - outCh chan *OutboxEvt 15 15 logger *slog.Logger 16 + notify chan struct{} 17 + events chan *OutboxEvt 16 18 } 17 19 18 20 func NewOutbox(db *gorm.DB) *Outbox { 19 21 return &Outbox{ 20 22 db: db, 21 - outCh: make(chan *OutboxEvt, 100), 22 23 logger: slog.Default().With("system", "outbox"), 24 + notify: make(chan struct{}), 25 + events: make(chan *OutboxEvt, 1000), 23 26 } 24 27 } 25 28 26 - func (o *Outbox) Subscribe(ctx context.Context, send func(evt *OutboxEvt) error) error { 27 - var bufferedEvts []models.OutboxBuffer 28 - if err := o.db.Order("id ASC").Find(&bufferedEvts).Error; err != nil { 29 - o.logger.Error("failed to load buffered events", "error", err) 30 - return err 31 - } 29 + func (o *Outbox) Run(ctx context.Context) { 30 + ticker := time.NewTicker(100 * time.Millisecond) 31 + defer ticker.Stop() 32 32 33 - if len(bufferedEvts) > 0 { 34 - o.logger.Info("draining buffered events", "count", len(bufferedEvts)) 35 - for _, evt := range bufferedEvts { 36 - var outboxEvt OutboxEvt 37 - if err := json.Unmarshal([]byte(evt.Data), &outboxEvt); err != nil { 38 - o.logger.Error("failed to unmarshal buffered event", "error", err, "id", evt.ID) 39 - continue 40 - } 41 - 42 - if err := send(&outboxEvt); err != nil { 43 - o.logger.Info("send error during drain", "error", err) 44 - return err 45 - } 46 - } 47 - 48 - if err := o.db.Delete(&bufferedEvts).Error; err != nil { 49 - o.logger.Error("failed to delete buffered events", "error", err) 50 - } else { 51 - o.logger.Info("cleared buffered events", "count", len(bufferedEvts)) 52 - } 53 - } 54 - 55 - o.logger.Info("starting live event stream") 56 33 for { 57 34 select { 58 35 case <-ctx.Done(): 59 - return ctx.Err() 60 - case evt := <-o.outCh: 61 - if err := send(evt); err != nil { 62 - o.logger.Info("send error during live stream", "error", err) 63 - return err 64 - } 36 + return 37 + case <-o.notify: 38 + o.deliverPending() 39 + case <-ticker.C: 40 + o.deliverPending() 65 41 } 66 42 } 67 43 } 68 44 69 - func (o *Outbox) SendRecordEvt(evt *RecordEvt) error { 70 - return o.SendOutboxEvt(&OutboxEvt{ 71 - Type: "record", 72 - RecordEvt: evt, 73 - }) 74 - } 45 + func (o *Outbox) deliverPending() { 46 + var events []models.OutboxBuffer 47 + if err := o.db.Order("id ASC").Limit(100).Find(&events).Error; err != nil { 48 + o.logger.Error("failed to query outbox buffer", "error", err) 49 + return 50 + } 51 + 52 + if len(events) == 0 { 53 + return 54 + } 75 55 76 - func (o *Outbox) SendUserEvt(evt *UserEvt) error { 77 - return o.SendOutboxEvt(&OutboxEvt{ 78 - Type: "user", 79 - UserEvt: evt, 80 - }) 56 + for _, evt := range events { 57 + var outboxEvt OutboxEvt 58 + if err := json.Unmarshal([]byte(evt.Data), &outboxEvt); err != nil { 59 + o.logger.Error("failed to unmarshal outbox event", "error", err, "id", evt.ID) 60 + continue 61 + } 62 + 63 + o.events <- &outboxEvt 64 + 65 + if err := o.db.Delete(&evt).Error; err != nil { 66 + o.logger.Error("failed to delete outbox event", "error", err, "id", evt.ID) 67 + } 68 + } 81 69 } 82 70 83 - func (o *Outbox) SendOutboxEvt(evt *OutboxEvt) error { 71 + func (o *Outbox) Notify() { 84 72 select { 85 - case o.outCh <- evt: 86 - return nil 73 + case o.notify <- struct{}{}: 87 74 default: 88 - return o.bufferEvent(evt) 89 75 } 90 76 } 91 77 92 - func (o *Outbox) bufferEvent(evt *OutboxEvt) error { 93 - jsonData, err := json.Marshal(evt) 94 - if err != nil { 95 - return err 96 - } 97 - 98 - return o.db.Create(&models.OutboxBuffer{ 99 - Data: string(jsonData), 100 - }).Error 78 + func (o *Outbox) Subscribe(ctx context.Context) <-chan *OutboxEvt { 79 + return o.events 101 80 }
+121 -81
nexus/processor.go
··· 71 71 } 72 72 } 73 73 74 - for _, recEvt := range commit.ToEvts() { 75 - if err := ep.Outbox.SendRecordEvt(recEvt); err != nil { 76 - ep.Logger.Error("failed to send to outbox", "did", commit.Did, "rev", commit.Rev, "error", err) 74 + if err := ep.DB.Transaction(func(tx *gorm.DB) error { 75 + if err := updateRepoState(tx, commit); err != nil { 77 76 return err 78 77 } 79 - } 80 78 81 - if err := ep.updateRepoState(commit); err != nil { 79 + for _, recEvt := range commit.ToEvts() { 80 + if err := persistRecordEvt(tx, recEvt); err != nil { 81 + return err 82 + } 83 + } 84 + 85 + return nil 86 + }); err != nil { 82 87 ep.Logger.Error("failed to update repo state", "did", commit.Did, "rev", commit.Rev, "error", err) 83 88 return err 84 89 } 85 90 91 + ep.Outbox.Notify() 86 92 return nil 87 93 } 88 94 ··· 212 218 return nil 213 219 } 214 220 215 - if err := ep.DB.Model(&models.Repo{}). 216 - Where("did = ?", did). 217 - Update("handle", handleStr).Error; err != nil { 218 - ep.Logger.Error("failed to update handle", "did", did, "handle", handleStr, "error", err) 219 - return err 220 - } 221 - 222 221 userEvt := &UserEvt{ 223 222 Did: did, 224 223 Handle: handleStr, ··· 226 225 Status: curr.Status, 227 226 } 228 227 229 - if err := ep.Outbox.SendUserEvt(userEvt); err != nil { 230 - ep.Logger.Error("failed to send user evt", "did", did, "error", err) 228 + if err := ep.DB.Transaction(func(tx *gorm.DB) error { 229 + if err := tx.Model(&models.Repo{}). 230 + Where("did = ?", did). 231 + Update("handle", handleStr).Error; err != nil { 232 + return err 233 + } 234 + 235 + return persistUserEvt(tx, userEvt) 236 + }); err != nil { 237 + ep.Logger.Error("failed to update handle", "did", did, "handle", handleStr, "error", err) 231 238 return err 232 239 } 233 240 241 + ep.Outbox.Notify() 234 242 return nil 235 243 } 236 244 ··· 258 266 return nil 259 267 } 260 268 269 + userEvt := &UserEvt{ 270 + Did: curr.Did, 271 + Handle: curr.Handle, 272 + IsActive: evt.Active, 273 + Status: updateTo, 274 + } 275 + 261 276 if updateTo == models.AccountStatusDeleted { 262 - err := ep.DeleteRepo(evt.Did) 263 - if err != nil { 277 + if err := ep.DB.Transaction(func(tx *gorm.DB) error { 278 + if err := deleteRepo(tx, evt.Did); err != nil { 279 + return err 280 + } 281 + return persistUserEvt(tx, userEvt) 282 + }); err != nil { 264 283 ep.Logger.Error("failed to delete repo", "did", evt.Did, "error", err) 265 284 return err 266 285 } 267 286 } else { 268 - err = ep.DB.Model(&models.Repo{}). 269 - Where("did = ?", evt.Did). 270 - Update("status", updateTo).Error 271 - if err != nil { 272 - ep.Logger.Error("failed to update repo status", "did", evt.Did, "status", models.AccountStatusActive, "error", err) 287 + if err := ep.DB.Transaction(func(tx *gorm.DB) error { 288 + if err := tx.Model(&models.Repo{}). 289 + Where("did = ?", evt.Did). 290 + Update("status", updateTo).Error; err != nil { 291 + return err 292 + } 293 + return persistUserEvt(tx, userEvt) 294 + }); err != nil { 295 + ep.Logger.Error("failed to update repo status", "did", evt.Did, "status", updateTo, "error", err) 273 296 return err 274 297 } 275 298 } 276 299 277 - err = ep.Outbox.SendUserEvt(&UserEvt{ 278 - Did: curr.Did, 279 - Handle: curr.Handle, 280 - IsActive: evt.Active, 281 - Status: updateTo, 282 - }) 283 - if err != nil { 284 - ep.Logger.Error("failed to send user evt", "did", evt.Did, "error", err) 285 - return err 286 - } 287 - 300 + ep.Outbox.Notify() 288 301 return nil 289 302 } 290 303 ··· 317 330 return fmt.Errorf("failed to unmarshal buffered event: %w", err) 318 331 } 319 332 320 - for _, recEvt := range commit.ToEvts() { 321 - if err := ep.Outbox.SendRecordEvt(recEvt); err != nil { 322 - ep.Logger.Error("failed to send to outbox", "did", commit.Did, "rev", commit.Rev, "error", err) 333 + if err := ep.DB.Transaction(func(tx *gorm.DB) error { 334 + if err := updateRepoState(tx, &commit); err != nil { 323 335 return err 324 336 } 325 - } 326 337 327 - if err := ep.updateRepoState(&commit); err != nil { 328 - ep.Logger.Error("failed to update repo state", "did", commit.Did, "rev", commit.Rev, "error", err) 329 - return err 330 - } 338 + for _, recEvt := range commit.ToEvts() { 339 + if err := persistRecordEvt(tx, recEvt); err != nil { 340 + return err 341 + } 342 + } 331 343 332 - if err := ep.DB.Delete(&models.ResyncBuffer{}, "id = ?", evt.ID).Error; err != nil { 333 - ep.Logger.Error("failed to delete buffered event", "id", evt.ID, "did", commit.Did, "rev", commit.Rev, "error", err) 344 + if err := tx.Delete(&models.ResyncBuffer{}, "id = ?", evt.ID).Error; err != nil { 345 + return err 346 + } 347 + 348 + return nil 349 + }); err != nil { 350 + ep.Logger.Error("failed to process buffered commit", "did", commit.Did, "rev", commit.Rev, "error", err) 334 351 return err 335 352 } 353 + 354 + ep.Outbox.Notify() 336 355 } 337 356 338 357 ep.Logger.Info("processed buffered resync events", "did", did, "count", len(bufferedEvts)) 339 358 return nil 340 359 } 341 360 342 - func (ep *EventProcessor) updateRepoState(commit *Commit) error { 343 - return ep.DB.Transaction(func(tx *gorm.DB) error { 344 - if err := tx.Model(&models.Repo{}). 345 - Where("did = ?", commit.Did). 346 - Updates(map[string]interface{}{ 347 - "rev": commit.Rev, 348 - "prev_data": commit.DataCid, 349 - }).Error; err != nil { 350 - return err 351 - } 361 + func updateRepoState(tx *gorm.DB, commit *Commit) error { 362 + if err := tx.Model(&models.Repo{}). 363 + Where("did = ?", commit.Did). 364 + Updates(map[string]interface{}{ 365 + "rev": commit.Rev, 366 + "prev_data": commit.DataCid, 367 + }).Error; err != nil { 368 + return err 369 + } 352 370 353 - for _, op := range commit.Ops { 354 - if op.Action == "delete" { 355 - if err := tx.Delete(&models.RepoRecord{}, "did = ? AND collection = ? AND rkey = ?", commit.Did, op.Collection, op.Rkey).Error; err != nil { 356 - return err 357 - } 358 - } else { 359 - repoRecord := models.RepoRecord{ 360 - Did: commit.Did, 361 - Collection: op.Collection, 362 - Rkey: op.Rkey, 363 - Cid: op.Cid, 364 - } 365 - if err := tx.Save(&repoRecord).Error; err != nil { 366 - return err 367 - } 371 + for _, op := range commit.Ops { 372 + if op.Action == "delete" { 373 + if err := tx.Delete(&models.RepoRecord{}, "did = ? AND collection = ? AND rkey = ?", commit.Did, op.Collection, op.Rkey).Error; err != nil { 374 + return err 375 + } 376 + } else { 377 + repoRecord := models.RepoRecord{ 378 + Did: commit.Did, 379 + Collection: op.Collection, 380 + Rkey: op.Rkey, 381 + Cid: op.Cid, 382 + } 383 + if err := tx.Save(&repoRecord).Error; err != nil { 384 + return err 368 385 } 369 386 } 387 + } 370 388 371 - return nil 372 - }) 389 + return nil 373 390 } 374 391 375 - func (ep *EventProcessor) DeleteRepo(did string) error { 376 - return ep.DB.Transaction(func(tx *gorm.DB) error { 377 - if err := tx.Delete(&models.RepoRecord{}, "did = ?", did).Error; err != nil { 378 - return err 379 - } 392 + func deleteRepo(tx *gorm.DB, did string) error { 393 + if err := tx.Delete(&models.RepoRecord{}, "did = ?", did).Error; err != nil { 394 + return err 395 + } 380 396 381 - if err := tx.Delete(&models.ResyncBuffer{}, "did = ?", did).Error; err != nil { 382 - return err 383 - } 397 + if err := tx.Delete(&models.ResyncBuffer{}, "did = ?", did).Error; err != nil { 398 + return err 399 + } 384 400 385 - if err := tx.Delete(&models.Repo{}, "did = ?", did).Error; err != nil { 386 - return err 387 - } 401 + if err := tx.Delete(&models.Repo{}, "did = ?", did).Error; err != nil { 402 + return err 403 + } 388 404 389 - return nil 405 + return nil 406 + } 407 + 408 + func persistRecordEvt(tx *gorm.DB, evt *RecordEvt) error { 409 + return persistOutboxEvt(tx, &OutboxEvt{ 410 + Type: "record", 411 + RecordEvt: evt, 390 412 }) 413 + } 414 + 415 + func persistUserEvt(tx *gorm.DB, evt *UserEvt) error { 416 + return persistOutboxEvt(tx, &OutboxEvt{ 417 + Type: "user", 418 + UserEvt: evt, 419 + }) 420 + } 421 + 422 + func persistOutboxEvt(tx *gorm.DB, evt *OutboxEvt) error { 423 + jsonData, err := json.Marshal(evt) 424 + if err != nil { 425 + return err 426 + } 427 + 428 + return tx.Create(&models.OutboxBuffer{ 429 + Data: string(jsonData), 430 + }).Error 391 431 } 392 432 393 433 func (ep *EventProcessor) saveCursor(ctx context.Context) error {
+10 -6
nexus/resync.go
··· 182 182 Cid: recCid.String(), 183 183 } 184 184 185 - if err := n.outbox.SendRecordEvt(evt); err != nil { 186 - return fmt.Errorf("failed to send evt: %w", err) 187 - } 188 - 189 185 repoRecord := models.RepoRecord{ 190 186 Did: did, 191 187 Collection: collStr, 192 188 Rkey: rkeyStr, 193 189 Cid: cidStr, 194 190 } 195 - if err := n.db.Save(&repoRecord).Error; err != nil { 196 - n.logger.Error("failed to save repo record", "error", err, "did", did, "path", recPath) 191 + 192 + if err := n.db.Transaction(func(tx *gorm.DB) error { 193 + if err := tx.Save(&repoRecord).Error; err != nil { 194 + return err 195 + } 196 + return persistRecordEvt(tx, evt) 197 + }); err != nil { 198 + n.logger.Error("failed to save record and persist event", "error", err, "did", did, "path", recPath) 199 + return nil 197 200 } 198 201 202 + n.outbox.Notify() 199 203 numRecords++ 200 204 return nil 201 205 })