this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

disk based event persistence (#215)

Still need to implement some of the trickier parts like takedowns and
rebases, but the primary IO flows are here.

authored by

Whyrusleeping and committed by
GitHub
7668894b 01b8b70d

+1218 -24
+19 -4
cmd/bigsky/main.go
··· 107 107 Name: "disk-blob-store", 108 108 }, 109 109 &cli.StringFlag{ 110 + Name: "disk-persister-dir", 111 + Usage: "set directory for disk persister (implicitly enables disk persister)", 112 + }, 113 + &cli.StringFlag{ 110 114 Name: "admin-key", 111 115 EnvVars: []string{"BGS_ADMIN_KEY"}, 112 116 }, ··· 224 228 225 229 repoman := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cstore, kmgr) 226 230 227 - dbp, err := events.NewDbPersistence(db, cstore, nil) 228 - if err != nil { 229 - return fmt.Errorf("setting up db event persistence: %w", err) 231 + var persister events.EventPersistence 232 + 233 + if dpd := cctx.String("disk-persister-dir"); dpd != "" { 234 + dp, err := events.NewDiskPersistence(dpd, "", db, events.DefaultDiskPersistOptions()) 235 + if err != nil { 236 + return fmt.Errorf("setting up disk persister: %w", err) 237 + } 238 + persister = dp 239 + } else { 240 + dbp, err := events.NewDbPersistence(db, cstore, nil) 241 + if err != nil { 242 + return fmt.Errorf("setting up db event persistence: %w", err) 243 + } 244 + persister = dbp 230 245 } 231 246 232 - evtman := events.NewEventManager(dbp) 247 + evtman := events.NewEventManager(persister) 233 248 234 249 notifman := &notifs.NullNotifs{} 235 250
+2 -2
events/dbpersist.go
··· 124 124 p.lk.Unlock() 125 125 126 126 if needsFlush { 127 - if err := p.FlushBatch(context.Background()); err != nil { 127 + if err := p.Flush(context.Background()); err != nil { 128 128 log.Errorf("failed to flush batch: %s", err) 129 129 } 130 130 } ··· 135 135 p.broadcast = brc 136 136 } 137 137 138 - func (p *DbPersistence) FlushBatch(ctx context.Context) error { 138 + func (p *DbPersistence) Flush(ctx context.Context) error { 139 139 p.lk.Lock() 140 140 defer p.lk.Unlock() 141 141 return p.flushBatchLocked(ctx)
+3 -3
events/dbpersist_test.go
··· 129 129 expectedEvtCount := b.N * numRoutines 130 130 131 131 // Flush manually 132 - err = dbp.FlushBatch(ctx) 132 + err = dbp.Flush(ctx) 133 133 if err != nil { 134 134 b.Fatal(err) 135 135 } ··· 249 249 expectedEvtCount := n * numRoutines 250 250 251 251 // Flush manually 252 - err = dbp.FlushBatch(ctx) 252 + err = dbp.Flush(ctx) 253 253 if err != nil { 254 254 b.Fatal(err) 255 255 } ··· 306 306 return nil, nil, nil, "", err 307 307 } 308 308 309 - return maindb, cardb, cs, "", nil 309 + return maindb, cardb, cs, dir, nil 310 310 }
+702 -8
events/diskpersist.go
··· 1 1 package events 2 2 3 + import ( 4 + "bufio" 5 + "bytes" 6 + "context" 7 + "encoding/binary" 8 + "errors" 9 + "fmt" 10 + "io" 11 + "os" 12 + "path/filepath" 13 + "sync" 14 + "time" 15 + 16 + "github.com/bluesky-social/indigo/api/atproto" 17 + "github.com/bluesky-social/indigo/models" 18 + lru "github.com/hashicorp/golang-lru" 19 + "gorm.io/gorm" 20 + ) 21 + 3 22 type DiskPersistence struct { 4 - dir string 23 + primaryDir string 24 + archiveDir string 25 + eventsPerFile int64 26 + writeBufferSize int 27 + 28 + evts chan *persistJob 29 + 30 + meta *gorm.DB 31 + 32 + broadcast func(*XRPCStreamEvent) 33 + 34 + logfi *os.File 35 + 36 + curSeq int64 37 + 38 + uidCache *lru.ARCCache 39 + didCache *lru.ARCCache 40 + 41 + buffers *sync.Pool 42 + scratch []byte 43 + 44 + outbuf *bytes.Buffer 45 + evtbuf []persistJob 46 + 47 + lk sync.Mutex 48 + } 49 + 50 + type persistJob struct { 51 + Buf []byte 52 + Evt *XRPCStreamEvent 53 + Buffer *bytes.Buffer // so we can put it back in the pool when we're done 54 + } 55 + 56 + type jobResult struct { 57 + Err error 58 + Seq int64 59 + } 60 + 61 + const ( 62 + EvtFlagTakedown = 1 << iota 63 + EvtFlagRebased 64 + ) 65 + 66 + var _ (EventPersistence) = (*DiskPersistence)(nil) 67 + 68 + type DiskPersistOptions struct { 69 + UIDCacheSize int 70 + DIDCacheSize int 71 + EventsPerFile int64 72 + WriteBufferSize int 73 + } 74 + 75 + func DefaultDiskPersistOptions() *DiskPersistOptions { 76 + return &DiskPersistOptions{ 77 + EventsPerFile: 10000, 78 + UIDCacheSize: 100000, 79 + DIDCacheSize: 100000, 80 + WriteBufferSize: 50, 81 + } 82 + } 83 + 84 + func NewDiskPersistence(primaryDir, archiveDir string, db *gorm.DB, opts *DiskPersistOptions) (*DiskPersistence, error) { 85 + if opts == nil { 86 + opts = DefaultDiskPersistOptions() 87 + } 88 + 89 + uidCache, err := lru.NewARC(opts.UIDCacheSize) 90 + if err != nil { 91 + return nil, fmt.Errorf("failed to create uid cache: %w", err) 92 + } 93 + 94 + didCache, err := lru.NewARC(opts.DIDCacheSize) 95 + if err != nil { 96 + return nil, fmt.Errorf("failed to create did cache: %w", err) 97 + } 98 + 99 + db.AutoMigrate(&LogFileRef{}) 100 + 101 + bufpool := &sync.Pool{ 102 + New: func() any { 103 + return new(bytes.Buffer) 104 + }, 105 + } 106 + 107 + dp := &DiskPersistence{ 108 + meta: db, 109 + primaryDir: primaryDir, 110 + archiveDir: archiveDir, 111 + buffers: bufpool, 112 + uidCache: uidCache, 113 + didCache: didCache, 114 + evts: make(chan *persistJob, 1024), 115 + eventsPerFile: opts.EventsPerFile, 116 + scratch: make([]byte, headerSize), 117 + outbuf: new(bytes.Buffer), 118 + writeBufferSize: opts.WriteBufferSize, 119 + } 120 + 121 + if err := dp.resumeLog(); err != nil { 122 + return nil, err 123 + } 124 + 125 + go dp.flushRoutine() 126 + 127 + return dp, nil 128 + } 129 + 130 + type LogFileRef struct { 131 + gorm.Model 132 + Path string 133 + Archived bool 134 + SeqStart int64 135 + } 136 + 137 + func (dp *DiskPersistence) resumeLog() error { 138 + var lfr LogFileRef 139 + if err := dp.meta.Order("seq_start desc").Limit(1).Find(&lfr).Error; err != nil { 140 + return err 141 + } 142 + 143 + if lfr.ID == 0 { 144 + // no files, start anew! 145 + return dp.initLogFile() 146 + } 147 + 148 + fi, err := os.Open(filepath.Join(dp.primaryDir, lfr.Path)) 149 + if err != nil { 150 + return err 151 + } 152 + 153 + seq, err := scanForLastSeq(fi, -1) 154 + if err != nil { 155 + return fmt.Errorf("failed to scan log file for last seqno: %w", err) 156 + } 157 + 158 + dp.curSeq = seq 159 + dp.logfi = fi 160 + 161 + return nil 162 + } 163 + 164 + func (dp *DiskPersistence) initLogFile() error { 165 + if err := os.MkdirAll(dp.primaryDir, 0775); err != nil { 166 + return err 167 + } 168 + 169 + p := filepath.Join(dp.primaryDir, "evts-0") 170 + fi, err := os.Create(p) 171 + if err != nil { 172 + return err 173 + } 174 + 175 + if err := dp.meta.Create(&LogFileRef{ 176 + Path: "evts-0", 177 + SeqStart: 0, 178 + }).Error; err != nil { 179 + return err 180 + } 181 + 182 + dp.logfi = fi 183 + dp.curSeq = 1 184 + return nil 185 + } 186 + 187 + // swapLog swaps the current log file out for a new empty one 188 + // must only be called while holding dp.lk 189 + func (dp *DiskPersistence) swapLog(ctx context.Context) error { 190 + if err := dp.logfi.Close(); err != nil { 191 + return fmt.Errorf("failed to close current log file: %w", err) 192 + } 193 + 194 + fname := fmt.Sprintf("evts-%d", dp.curSeq) 195 + nextp := filepath.Join(dp.primaryDir, fname) 196 + 197 + fi, err := os.Create(nextp) 198 + if err != nil { 199 + return err 200 + } 201 + 202 + if err := dp.meta.Create(&LogFileRef{ 203 + Path: fname, 204 + SeqStart: dp.curSeq, 205 + }).Error; err != nil { 206 + return err 207 + } 208 + 209 + dp.logfi = fi 210 + return nil 211 + } 212 + 213 + func scanForLastSeq(fi *os.File, end int64) (int64, error) { 214 + scratch := make([]byte, headerSize) 215 + 216 + var lastSeq int64 = -1 217 + var offset int64 218 + for { 219 + eh, err := readHeader(fi, scratch) 220 + if err != nil { 221 + if err == io.EOF { 222 + return lastSeq, nil 223 + } 224 + return 0, err 225 + } 226 + 227 + if end > 0 && eh.Seq > end { 228 + // return to beginning of offset 229 + n, err := fi.Seek(offset, io.SeekStart) 230 + if err != nil { 231 + return 0, err 232 + } 233 + 234 + if n != offset { 235 + return 0, fmt.Errorf("rewind seek failed") 236 + } 237 + 238 + return eh.Seq, nil 239 + } 240 + 241 + lastSeq = eh.Seq 242 + 243 + noff, err := fi.Seek(int64(eh.Len), io.SeekCurrent) 244 + if err != nil { 245 + return 0, err 246 + } 247 + 248 + if noff != offset+headerSize+int64(eh.Len) { 249 + // TODO: must recover from this 250 + return 0, fmt.Errorf("did not seek to next event properly") 251 + } 252 + 253 + offset = noff 254 + } 255 + } 256 + 257 + const ( 258 + evtKindCommit = 1 259 + evtKindHandle = 2 260 + ) 261 + 262 + var emptyHeader = make([]byte, headerSize) 263 + 264 + func (p *DiskPersistence) addJobToQueue(job persistJob) error { 265 + p.lk.Lock() 266 + defer p.lk.Unlock() 267 + 268 + if err := p.doPersist(job); err != nil { 269 + return err 270 + } 271 + 272 + // TODO: for some reason replacing this constant with p.writeBufferSize dramatically reduces perf... 273 + if len(p.evtbuf) > 400 { 274 + if err := p.flushLog(context.TODO()); err != nil { 275 + return fmt.Errorf("failed to flush disk log: %w", err) 276 + } 277 + } 278 + 279 + return nil 5 280 } 6 281 7 - func NewDiskPersistence(dir string) (*DiskPersistence, error) { 8 - return &DiskPersistence{ 9 - dir: dir, 282 + func (p *DiskPersistence) flushRoutine() { 283 + t := time.NewTicker(time.Millisecond * 100) 284 + 285 + for { 286 + select { 287 + case <-t.C: 288 + p.lk.Lock() 289 + if err := p.flushLog(context.TODO()); err != nil { 290 + // TODO: this happening is quite bad. Need a recovery strategy 291 + log.Errorf("failed to flush disk log: %s", err) 292 + } 293 + p.lk.Unlock() 294 + } 295 + } 296 + } 297 + 298 + func (p *DiskPersistence) flushLog(ctx context.Context) error { 299 + if len(p.evtbuf) == 0 { 300 + return nil 301 + } 302 + 303 + _, err := io.Copy(p.logfi, p.outbuf) 304 + if err != nil { 305 + return err 306 + } 307 + 308 + p.outbuf.Truncate(0) 309 + 310 + for _, ej := range p.evtbuf { 311 + p.broadcast(ej.Evt) 312 + ej.Buffer.Truncate(0) 313 + p.buffers.Put(ej.Buffer) 314 + } 315 + 316 + p.evtbuf = p.evtbuf[:0] 317 + 318 + return nil 319 + } 320 + 321 + func (p *DiskPersistence) doPersist(j persistJob) error { 322 + b := j.Buf 323 + e := j.Evt 324 + seq := p.curSeq 325 + p.curSeq++ 326 + 327 + binary.LittleEndian.PutUint64(b[20:], uint64(seq)) 328 + 329 + switch { 330 + case e.RepoCommit != nil: 331 + e.RepoCommit.Seq = seq 332 + case e.RepoHandle != nil: 333 + e.RepoHandle.Seq = seq 334 + default: 335 + // only those two get peristed right now 336 + // we shouldnt actually ever get here... 337 + return nil 338 + } 339 + 340 + // TODO: does this guarantee a full write? 341 + _, err := p.outbuf.Write(b) 342 + if err != nil { 343 + return err 344 + } 345 + 346 + p.evtbuf = append(p.evtbuf, j) 347 + 348 + if seq%p.eventsPerFile == 0 { 349 + if err := p.flushLog(context.TODO()); err != nil { 350 + return err 351 + } 352 + 353 + // time to roll the log file 354 + if err := p.swapLog(context.TODO()); err != nil { 355 + return err 356 + } 357 + } 358 + 359 + return nil 360 + } 361 + 362 + func (p *DiskPersistence) Persist(ctx context.Context, e *XRPCStreamEvent) error { 363 + buffer := p.buffers.Get().(*bytes.Buffer) 364 + 365 + buffer.Truncate(0) 366 + 367 + buffer.Write(emptyHeader) 368 + 369 + var did string 370 + var evtKind uint32 371 + switch { 372 + case e.RepoCommit != nil: 373 + evtKind = evtKindCommit 374 + did = e.RepoCommit.Repo 375 + if err := e.RepoCommit.MarshalCBOR(buffer); err != nil { 376 + return fmt.Errorf("failed to marshal: %w", err) 377 + } 378 + case e.RepoHandle != nil: 379 + evtKind = evtKindHandle 380 + did = e.RepoHandle.Did 381 + if err := e.RepoHandle.MarshalCBOR(buffer); err != nil { 382 + return fmt.Errorf("failed to marshal: %w", err) 383 + } 384 + default: 385 + return nil 386 + // only those two get peristed right now 387 + } 388 + 389 + usr, err := p.uidForDid(ctx, did) 390 + if err != nil { 391 + return err 392 + } 393 + 394 + b := buffer.Bytes() 395 + 396 + binary.LittleEndian.PutUint32(b, 0) 397 + binary.LittleEndian.PutUint32(b[4:], evtKind) 398 + binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize)) 399 + binary.LittleEndian.PutUint64(b[12:], uint64(usr)) 400 + 401 + return p.addJobToQueue(persistJob{ 402 + Buf: b, 403 + Evt: e, 404 + Buffer: buffer, 405 + }) 406 + } 407 + 408 + type evtHeader struct { 409 + Flags uint32 410 + Kind uint32 411 + Seq int64 412 + Usr models.Uid 413 + Len uint32 414 + } 415 + 416 + func (eh *evtHeader) Len64() int64 { 417 + return int64(eh.Len) 418 + } 419 + 420 + const headerSize = 4 + 4 + 4 + 8 + 8 421 + 422 + func readHeader(r io.Reader, scratch []byte) (*evtHeader, error) { 423 + if len(scratch) < headerSize { 424 + return nil, fmt.Errorf("must pass scratch buffer of at least %d bytes", headerSize) 425 + } 426 + 427 + scratch = scratch[:headerSize] 428 + _, err := io.ReadFull(r, scratch) 429 + if err != nil { 430 + return nil, fmt.Errorf("reading header: %w", err) 431 + } 432 + 433 + flags := binary.LittleEndian.Uint32(scratch[:4]) 434 + kind := binary.LittleEndian.Uint32(scratch[4:8]) 435 + l := binary.LittleEndian.Uint32(scratch[8:12]) 436 + usr := binary.LittleEndian.Uint64(scratch[12:20]) 437 + seq := binary.LittleEndian.Uint64(scratch[20:28]) 438 + 439 + return &evtHeader{ 440 + Flags: flags, 441 + Kind: kind, 442 + Len: l, 443 + Usr: models.Uid(usr), 444 + Seq: int64(seq), 10 445 }, nil 11 446 } 12 447 13 - func (p *DiskPersistence) Persist(e *XRPCStreamEvent) error { 14 - panic("nyi") 448 + func (p *DiskPersistence) writeHeader(ctx context.Context, flags uint32, kind uint32, l uint32, usr uint64, seq int64) error { 449 + binary.LittleEndian.PutUint32(p.scratch, flags) 450 + binary.LittleEndian.PutUint32(p.scratch[4:], kind) 451 + binary.LittleEndian.PutUint32(p.scratch[8:], l) 452 + binary.LittleEndian.PutUint64(p.scratch[12:], usr) 453 + binary.LittleEndian.PutUint64(p.scratch[20:], uint64(seq)) 454 + 455 + nw, err := p.logfi.Write(p.scratch) 456 + if err != nil { 457 + return err 458 + } 459 + 460 + if nw != headerSize { 461 + return fmt.Errorf("only wrote %d bytes for header", nw) 462 + } 463 + 464 + return nil 465 + } 466 + 467 + func (p *DiskPersistence) uidForDid(ctx context.Context, did string) (models.Uid, error) { 468 + if uid, ok := p.didCache.Get(did); ok { 469 + return uid.(models.Uid), nil 470 + } 471 + 472 + var u models.ActorInfo 473 + if err := p.meta.First(&u, "did = ?", did).Error; err != nil { 474 + return 0, err 475 + } 476 + 477 + p.didCache.Add(did, u.Uid) 478 + 479 + return u.Uid, nil 480 + } 481 + 482 + func (p *DiskPersistence) Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error { 483 + base := since - (since * p.eventsPerFile) 484 + var logs []LogFileRef 485 + if err := p.meta.Debug().Order("seq_start asc").Find(&logs, "seq_start >= ?", base).Error; err != nil { 486 + return err 487 + } 488 + 489 + for _, lf := range logs { 490 + if err := p.readEventsFrom(ctx, since, filepath.Join(p.primaryDir, lf.Path), cb); err != nil { 491 + return err 492 + } 493 + since = 0 494 + } 495 + 496 + return nil 497 + } 498 + 499 + func postDoNotEmit(flags uint32) bool { 500 + if flags&(EvtFlagRebased|EvtFlagTakedown) != 0 { 501 + return true 502 + } 503 + 504 + return false 505 + } 506 + 507 + func (p *DiskPersistence) readEventsFrom(ctx context.Context, since int64, fn string, cb func(*XRPCStreamEvent) error) error { 508 + fi, err := os.OpenFile(fn, os.O_RDONLY, 0) 509 + if err != nil { 510 + return err 511 + } 512 + 513 + if since != 0 { 514 + _, err := scanForLastSeq(fi, since) 515 + if err != nil { 516 + return err 517 + } 518 + } 519 + 520 + bufr := bufio.NewReader(fi) 521 + 522 + scratch := make([]byte, headerSize) 523 + for { 524 + h, err := readHeader(bufr, scratch) 525 + if err != nil { 526 + if errors.Is(err, io.EOF) { 527 + return nil 528 + } 529 + 530 + return err 531 + } 532 + 533 + if postDoNotEmit(h.Flags) { 534 + // event taken down, skip 535 + _, err := io.CopyN(io.Discard, bufr, h.Len64()) // would be really nice if the buffered reader had a 'skip' method that does a seek under the hood 536 + if err != nil { 537 + return fmt.Errorf("failed while skipping event (seq: %d, fn: %q): %w", h.Seq, fn, err) 538 + } 539 + continue 540 + } 541 + 542 + switch h.Kind { 543 + case evtKindCommit: 544 + var evt atproto.SyncSubscribeRepos_Commit 545 + if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 546 + return err 547 + } 548 + evt.Seq = h.Seq 549 + if err := cb(&XRPCStreamEvent{RepoCommit: &evt}); err != nil { 550 + return err 551 + } 552 + case evtKindHandle: 553 + var evt atproto.SyncSubscribeRepos_Handle 554 + if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil { 555 + return err 556 + } 557 + evt.Seq = h.Seq 558 + if err := cb(&XRPCStreamEvent{RepoHandle: &evt}); err != nil { 559 + return err 560 + } 561 + default: 562 + log.Warnw("unrecognized event kind coming from log file", "seq", h.Seq, "kind", h.Kind) 563 + return fmt.Errorf("halting on unrecognized event kind") 564 + } 565 + } 566 + } 567 + 568 + type UserAction struct { 569 + gorm.Model 570 + 571 + Usr models.Uid 572 + RebaseAt int64 573 + Takedown bool 574 + } 575 + 576 + func (p *DiskPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error { 577 + /* 578 + if err := p.meta.Create(&UserAction{ 579 + Usr: usr, 580 + Takedown: true, 581 + }).Error; err != nil { 582 + return err 583 + } 584 + */ 585 + 586 + return p.forEachShardWithUserEvents(ctx, usr, func(ctx context.Context, fn string) error { 587 + if err := p.deleteEventsForUser(ctx, usr, fn); err != nil { 588 + return err 589 + } 590 + 591 + return nil 592 + }) 593 + } 594 + 595 + func (p *DiskPersistence) forEachShardWithUserEvents(ctx context.Context, usr models.Uid, cb func(context.Context, string) error) error { 596 + var refs []LogFileRef 597 + if err := p.meta.Order("created_at desc").Find(&refs).Error; err != nil { 598 + return err 599 + } 600 + 601 + for _, r := range refs { 602 + mhas, err := p.refMaybeHasUserEvents(ctx, usr, r) 603 + if err != nil { 604 + return err 605 + } 606 + 607 + if mhas { 608 + var path string 609 + if r.Archived { 610 + path = filepath.Join(p.archiveDir, r.Path) 611 + } else { 612 + path = filepath.Join(p.primaryDir, r.Path) 613 + } 614 + 615 + if err := cb(ctx, path); err != nil { 616 + return err 617 + } 618 + } 619 + } 620 + 621 + return nil 622 + } 623 + 624 + func (p *DiskPersistence) refMaybeHasUserEvents(ctx context.Context, usr models.Uid, ref LogFileRef) (bool, error) { 625 + // TODO: lazily computed bloom filters for users in each logfile 626 + return true, nil 15 627 } 16 628 17 - func (p *DiskPersistence) Playback(since int64, cb func(*XRPCStreamEvent) error) error { 18 - panic("nyi") 629 + type zeroReader struct{} 630 + 631 + func (zr *zeroReader) Read(p []byte) (n int, err error) { 632 + for i := range p { 633 + p[i] = 0 634 + } 635 + return len(p), nil 636 + } 637 + 638 + func (p *DiskPersistence) deleteEventsForUser(ctx context.Context, usr models.Uid, fn string) error { 639 + return p.mutateUserEventsInLog(ctx, usr, fn, EvtFlagTakedown, true) 640 + } 641 + 642 + func (p *DiskPersistence) mutateUserEventsInLog(ctx context.Context, usr models.Uid, fn string, flag uint32, zeroEvts bool) error { 643 + fi, err := os.OpenFile(fn, os.O_RDWR, 0) 644 + if err != nil { 645 + return fmt.Errorf("failed to open log file: %w", err) 646 + } 647 + defer fi.Close() 648 + defer fi.Sync() 649 + 650 + scratch := make([]byte, headerSize) 651 + var offset int64 652 + for { 653 + h, err := readHeader(fi, scratch) 654 + if err != nil { 655 + if errors.Is(err, io.EOF) { 656 + return nil 657 + } 658 + 659 + return err 660 + } 661 + 662 + if h.Usr == usr && h.Flags&flag == 0 { 663 + nflag := h.Flags | flag 664 + 665 + binary.LittleEndian.PutUint32(scratch, nflag) 666 + 667 + if _, err := fi.WriteAt(scratch[:4], offset); err != nil { 668 + return fmt.Errorf("failed to write updated flag value: %w", err) 669 + } 670 + 671 + if zeroEvts { 672 + // sync that write before blanking the event data 673 + if err := fi.Sync(); err != nil { 674 + return err 675 + } 676 + 677 + if _, err := fi.Seek(offset+headerSize, io.SeekStart); err != nil { 678 + return fmt.Errorf("failed to seek: %w", err) 679 + } 680 + 681 + _, err := io.CopyN(fi, &zeroReader{}, h.Len64()) 682 + if err != nil { 683 + return err 684 + } 685 + } 686 + } 687 + 688 + offset += headerSize + h.Len64() 689 + _, err = fi.Seek(offset, io.SeekStart) 690 + if err != nil { 691 + return fmt.Errorf("failed to seek: %w", err) 692 + } 693 + } 694 + } 695 + 696 + func (p *DiskPersistence) RebaseRepoEvents(ctx context.Context, usr models.Uid) error { 697 + return p.forEachShardWithUserEvents(ctx, usr, func(ctx context.Context, fn string) error { 698 + return p.mutateUserEventsInLog(ctx, usr, fn, EvtFlagRebased, false) 699 + }) 700 + } 701 + 702 + func (p *DiskPersistence) Flush(ctx context.Context) error { 703 + p.lk.Lock() 704 + defer p.lk.Unlock() 705 + if len(p.evtbuf) > 0 { 706 + return p.flushLog(ctx) 707 + } 708 + return nil 709 + } 710 + 711 + func (p *DiskPersistence) SetEventBroadcaster(f func(*XRPCStreamEvent)) { 712 + p.broadcast = f 19 713 }
+465
events/diskpersist_test.go
··· 1 + package events_test 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "os" 7 + "path/filepath" 8 + "reflect" 9 + "sync" 10 + "testing" 11 + "time" 12 + 13 + atproto "github.com/bluesky-social/indigo/api/atproto" 14 + "github.com/bluesky-social/indigo/api/bsky" 15 + "github.com/bluesky-social/indigo/carstore" 16 + "github.com/bluesky-social/indigo/events" 17 + lexutil "github.com/bluesky-social/indigo/lex/util" 18 + "github.com/bluesky-social/indigo/models" 19 + "github.com/bluesky-social/indigo/pds" 20 + "github.com/bluesky-social/indigo/repomgr" 21 + "github.com/bluesky-social/indigo/util" 22 + "gorm.io/gorm" 23 + ) 24 + 25 + func TestDiskPersist(t *testing.T) { 26 + ctx := context.Background() 27 + 28 + db, _, cs, tempPath, err := setupDBs(t) 29 + if err != nil { 30 + t.Fatal(err) 31 + } 32 + 33 + db.AutoMigrate(&pds.User{}) 34 + db.AutoMigrate(&pds.Peering{}) 35 + db.AutoMigrate(&models.ActorInfo{}) 36 + 37 + db.Create(&models.ActorInfo{ 38 + Uid: 1, 39 + Did: "did:example:123", 40 + }) 41 + 42 + mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 43 + 44 + err = mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 45 + if err != nil { 46 + t.Fatal(err) 47 + } 48 + 49 + _, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 50 + Text: "hello world", 51 + CreatedAt: time.Now().Format(util.ISO8601), 52 + }) 53 + if err != nil { 54 + t.Fatal(err) 55 + } 56 + 57 + defer os.RemoveAll(tempPath) 58 + 59 + // Initialize a DBPersister 60 + 61 + dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 62 + EventsPerFile: 10, 63 + UIDCacheSize: 100000, 64 + DIDCacheSize: 100000, 65 + }) 66 + if err != nil { 67 + t.Fatal(err) 68 + } 69 + 70 + // Create a bunch of events 71 + evtman := events.NewEventManager(dp) 72 + 73 + userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 74 + if err != nil { 75 + t.Fatal(err) 76 + } 77 + 78 + n := 100 79 + inEvts := make([]*events.XRPCStreamEvent, n) 80 + for i := 0; i < n; i++ { 81 + cidLink := lexutil.LexLink(cid) 82 + headLink := lexutil.LexLink(userRepoHead) 83 + inEvts[i] = &events.XRPCStreamEvent{ 84 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 85 + Repo: "did:example:123", 86 + Commit: headLink, 87 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 88 + { 89 + Action: "add", 90 + Cid: &cidLink, 91 + Path: "path1", 92 + }, 93 + }, 94 + Time: time.Now().Format(util.ISO8601), 95 + }, 96 + } 97 + } 98 + 99 + // Add events in parallel 100 + for i := 0; i < n; i++ { 101 + err = evtman.AddEvent(ctx, inEvts[i]) 102 + if err != nil { 103 + t.Fatal(err) 104 + } 105 + } 106 + 107 + if err := dp.Flush(ctx); err != nil { 108 + t.Fatal(err) 109 + } 110 + 111 + outEvtCount := 0 112 + expectedEvtCount := n 113 + 114 + dp.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 115 + outEvtCount++ 116 + return nil 117 + }) 118 + 119 + if outEvtCount != expectedEvtCount { 120 + t.Fatalf("expected %d events, got %d", expectedEvtCount, outEvtCount) 121 + } 122 + } 123 + 124 + func BenchmarkDiskPersist(b *testing.B) { 125 + db, _, cs, tempPath, err := setupDBs(b) 126 + if err != nil { 127 + b.Fatal(err) 128 + } 129 + 130 + defer os.RemoveAll(tempPath) 131 + 132 + // Initialize a DBPersister 133 + 134 + dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 135 + EventsPerFile: 5000, 136 + UIDCacheSize: 100000, 137 + DIDCacheSize: 100000, 138 + }) 139 + if err != nil { 140 + b.Fatal(err) 141 + } 142 + 143 + runPersisterBenchmark(b, cs, db, dp) 144 + } 145 + 146 + func runPersisterBenchmark(b *testing.B, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 147 + ctx := context.Background() 148 + 149 + db.AutoMigrate(&pds.User{}) 150 + db.AutoMigrate(&pds.Peering{}) 151 + db.AutoMigrate(&models.ActorInfo{}) 152 + 153 + db.Create(&models.ActorInfo{ 154 + Uid: 1, 155 + Did: "did:example:123", 156 + }) 157 + 158 + mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 159 + 160 + err := mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 161 + if err != nil { 162 + b.Fatal(err) 163 + } 164 + 165 + _, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 166 + Text: "hello world", 167 + CreatedAt: time.Now().Format(util.ISO8601), 168 + }) 169 + if err != nil { 170 + b.Fatal(err) 171 + } 172 + 173 + // Create a bunch of events 174 + evtman := events.NewEventManager(p) 175 + 176 + userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 177 + if err != nil { 178 + b.Fatal(err) 179 + } 180 + 181 + inEvts := make([]*events.XRPCStreamEvent, b.N) 182 + for i := 0; i < b.N; i++ { 183 + cidLink := lexutil.LexLink(cid) 184 + headLink := lexutil.LexLink(userRepoHead) 185 + inEvts[i] = &events.XRPCStreamEvent{ 186 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 187 + Repo: "did:example:123", 188 + Commit: headLink, 189 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 190 + { 191 + Action: "add", 192 + Cid: &cidLink, 193 + Path: "path1", 194 + }, 195 + }, 196 + Time: time.Now().Format(util.ISO8601), 197 + }, 198 + } 199 + } 200 + 201 + numRoutines := 4 202 + wg := sync.WaitGroup{} 203 + 204 + b.ResetTimer() 205 + 206 + errChan := make(chan error, numRoutines) 207 + 208 + // Add events in parallel 209 + for i := 0; i < numRoutines; i++ { 210 + wg.Add(1) 211 + go func() { 212 + defer wg.Done() 213 + for i := 0; i < b.N; i++ { 214 + err = evtman.AddEvent(ctx, inEvts[i]) 215 + if err != nil { 216 + errChan <- err 217 + } 218 + } 219 + }() 220 + } 221 + 222 + wg.Wait() 223 + close(errChan) 224 + 225 + // Check for errors 226 + for err := range errChan { 227 + if err != nil { 228 + b.Fatal(err) 229 + } 230 + } 231 + 232 + // Flush manually 233 + if err := p.Flush(ctx); err != nil { 234 + b.Fatal(err) 235 + } 236 + 237 + } 238 + 239 + func TestDiskPersister(t *testing.T) { 240 + db, _, cs, tempPath, err := setupDBs(t) 241 + if err != nil { 242 + t.Fatal(err) 243 + } 244 + 245 + defer os.RemoveAll(tempPath) 246 + 247 + // Initialize a DBPersister 248 + 249 + dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 250 + EventsPerFile: 20, 251 + UIDCacheSize: 100000, 252 + DIDCacheSize: 100000, 253 + }) 254 + if err != nil { 255 + t.Fatal(err) 256 + } 257 + 258 + runEventManagerTest(t, cs, db, dp) 259 + } 260 + 261 + func runEventManagerTest(t *testing.T, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 262 + ctx := context.Background() 263 + 264 + db.AutoMigrate(&pds.User{}) 265 + db.AutoMigrate(&pds.Peering{}) 266 + db.AutoMigrate(&models.ActorInfo{}) 267 + 268 + db.Create(&models.ActorInfo{ 269 + Uid: 1, 270 + Did: "did:example:123", 271 + }) 272 + 273 + mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 274 + 275 + err := mgr.InitNewActor(ctx, 1, "alice", "did:example:123", "Alice", "", "") 276 + if err != nil { 277 + t.Fatal(err) 278 + } 279 + 280 + _, cid, err := mgr.CreateRecord(ctx, 1, "app.bsky.feed.post", &bsky.FeedPost{ 281 + Text: "hello world", 282 + CreatedAt: time.Now().Format(util.ISO8601), 283 + }) 284 + if err != nil { 285 + t.Fatal(err) 286 + } 287 + 288 + evtman := events.NewEventManager(p) 289 + 290 + userRepoHead, err := mgr.GetRepoRoot(ctx, 1) 291 + if err != nil { 292 + t.Fatal(err) 293 + } 294 + 295 + testSize := 100 // you can adjust this number as needed 296 + inEvts := make([]*events.XRPCStreamEvent, testSize) 297 + for i := 0; i < testSize; i++ { 298 + cidLink := lexutil.LexLink(cid) 299 + headLink := lexutil.LexLink(userRepoHead) 300 + inEvts[i] = &events.XRPCStreamEvent{ 301 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 302 + Repo: "did:example:123", 303 + Commit: headLink, 304 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 305 + { 306 + Action: "add", 307 + Cid: &cidLink, 308 + Path: "path1", 309 + }, 310 + }, 311 + Time: time.Now().Format(util.ISO8601), 312 + }, 313 + } 314 + 315 + err = evtman.AddEvent(ctx, inEvts[i]) 316 + if err != nil { 317 + t.Fatal(err) 318 + } 319 + } 320 + 321 + // Flush manually 322 + if err := p.Flush(ctx); err != nil { 323 + t.Fatal(err) 324 + } 325 + 326 + outEvtCount := 0 327 + p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 328 + // Check that the contents of the output events match the input events 329 + if !reflect.DeepEqual(inEvts[outEvtCount], evt) { 330 + t.Logf("%v", inEvts[outEvtCount].RepoCommit) 331 + t.Logf("%v", evt.RepoCommit) 332 + t.Fatalf("Event content mismatch: expected %+v, got %+v", inEvts[outEvtCount], evt) 333 + } 334 + outEvtCount++ 335 + return nil 336 + }) 337 + 338 + if outEvtCount != testSize { 339 + t.Fatalf("expected %d events, got %d", testSize, outEvtCount) 340 + } 341 + } 342 + 343 + func TestDiskPersisterTakedowns(t *testing.T) { 344 + db, _, cs, tempPath, err := setupDBs(t) 345 + if err != nil { 346 + t.Fatal(err) 347 + } 348 + 349 + defer os.RemoveAll(tempPath) 350 + 351 + // Initialize a DBPersister 352 + 353 + dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 354 + EventsPerFile: 10, 355 + UIDCacheSize: 100000, 356 + DIDCacheSize: 100000, 357 + }) 358 + if err != nil { 359 + t.Fatal(err) 360 + } 361 + 362 + runTakedownTest(t, cs, db, dp) 363 + } 364 + 365 + func runTakedownTest(t *testing.T, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 366 + ctx := context.TODO() 367 + 368 + db.AutoMigrate(&pds.User{}) 369 + db.AutoMigrate(&pds.Peering{}) 370 + db.AutoMigrate(&models.ActorInfo{}) 371 + 372 + mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 373 + 374 + // Create multiple users 375 + userCount := 10 376 + users := make([]*models.ActorInfo, userCount) 377 + for i := models.Uid(1); i <= models.Uid(userCount); i++ { 378 + did := fmt.Sprintf("did:example:%d", i) 379 + handle := fmt.Sprintf("user%d", i) 380 + users[i-1] = &models.ActorInfo{ 381 + Uid: i, 382 + Did: did, 383 + Handle: handle, 384 + } 385 + if err := db.Create(&users[i-1]).Error; err != nil { 386 + t.Fatal(err) 387 + } 388 + 389 + err := mgr.InitNewActor(ctx, i, handle, did, fmt.Sprintf("User%d", i), "", "") 390 + if err != nil { 391 + t.Fatal(err) 392 + } 393 + } 394 + 395 + evtman := events.NewEventManager(p) 396 + 397 + testSize := 100 // you can adjust this number as needed 398 + inEvts := make([]*events.XRPCStreamEvent, testSize*userCount) 399 + for i := 0; i < testSize*userCount; i++ { 400 + user := users[i%userCount] 401 + _, cid, err := mgr.CreateRecord(ctx, user.Uid, "app.bsky.feed.post", &bsky.FeedPost{ 402 + Text: fmt.Sprintf("hello world from user %d", user.Uid), 403 + CreatedAt: time.Now().Format(util.ISO8601), 404 + }) 405 + if err != nil { 406 + t.Fatal(err) 407 + } 408 + 409 + userRepoHead, err := mgr.GetRepoRoot(ctx, user.Uid) 410 + if err != nil { 411 + t.Fatal(err) 412 + } 413 + 414 + cidLink := lexutil.LexLink(cid) 415 + headLink := lexutil.LexLink(userRepoHead) 416 + inEvts[i] = &events.XRPCStreamEvent{ 417 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 418 + Repo: user.Did, 419 + Commit: headLink, 420 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 421 + { 422 + Action: "add", 423 + Cid: &cidLink, 424 + Path: "path1", 425 + }, 426 + }, 427 + Time: time.Now().Format(util.ISO8601), 428 + }, 429 + } 430 + 431 + err = evtman.AddEvent(ctx, inEvts[i]) 432 + if err != nil { 433 + t.Fatal(err) 434 + } 435 + } 436 + 437 + // Flush manually 438 + if err := p.Flush(ctx); err != nil { 439 + t.Fatal(err) 440 + } 441 + 442 + // Pick a user to take down 443 + takeDownUser := users[5] // For example, user with UID 6 (0-indexed) 444 + 445 + if err := evtman.TakeDownRepo(ctx, takeDownUser.Uid); err != nil { 446 + t.Fatal(err) 447 + } 448 + 449 + // Verify that the events of the user have been removed from the event stream 450 + var evtsCount int 451 + if err := p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 452 + evtsCount++ 453 + if evt.RepoCommit.Repo == takeDownUser.Did { 454 + t.Fatalf("found event for user %d after takedown", takeDownUser.Uid) 455 + } 456 + return nil 457 + }); err != nil { 458 + t.Fatal(err) 459 + } 460 + 461 + exp := testSize * (userCount - 1) 462 + if evtsCount != exp { 463 + t.Fatalf("wrong number of events out: %d != %d", evtsCount, exp) 464 + } 465 + }
+5
events/persist.go
··· 14 14 Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error 15 15 TakeDownRepo(ctx context.Context, usr models.Uid) error 16 16 RebaseRepoEvents(ctx context.Context, usr models.Uid) error 17 + Flush(context.Context) error 17 18 18 19 SetEventBroadcaster(func(*XRPCStreamEvent)) 19 20 } ··· 83 84 84 85 func (mp *MemPersister) RebaseRepoEvents(ctx context.Context, usr models.Uid) error { 85 86 return fmt.Errorf("repo rebases not currently supported by memory persister, test usage only") 87 + } 88 + 89 + func (mp *MemPersister) Flush(ctx context.Context) error { 90 + return nil 86 91 } 87 92 88 93 func (mp *MemPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) {
+4
events/yolopersist.go
··· 59 59 func (yp *YoloPersister) SetEventBroadcaster(brc func(*XRPCStreamEvent)) { 60 60 yp.broadcast = brc 61 61 } 62 + 63 + func (yp *YoloPersister) Flush(ctx context.Context) error { 64 + return nil 65 + }
+1 -1
models/uid.go
··· 1 1 package models 2 2 3 - type Uid uint 3 + type Uid uint64
+12
repomgr/bench_test.go
··· 25 25 t.Fatal(err) 26 26 } 27 27 28 + if err := db.Exec("PRAGMA synchronous=normal;").Error; err != nil { 29 + t.Fatal(err) 30 + } 31 + 32 + if err := db.Exec("PRAGMA temp_store=memory;").Error; err != nil { 33 + t.Fatal(err) 34 + } 35 + 36 + if err := db.Exec("PRAGMA mmap_size=3000000000;").Error; err != nil { 37 + t.Fatal(err) 38 + } 39 + 28 40 return db 29 41 } 30 42
+5 -6
testing/utils.go
··· 429 429 430 430 notifman := notifs.NewNotificationManager(maindb, repoman.GetRecord) 431 431 432 - dbpersist, err := events.NewDbPersistence(maindb, cs, nil) 433 - if err != nil { 434 - return nil, err 435 - } 432 + opts := events.DefaultDiskPersistOptions() 433 + opts.EventsPerFile = 10 434 + diskpersist, err := events.NewDiskPersistence(filepath.Join(dir, "dp-primary"), filepath.Join(dir, "dp-archive"), maindb, opts) 436 435 437 - evtman := events.NewEventManager(dbpersist) 436 + evtman := events.NewEventManager(diskpersist) 438 437 439 438 ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, repoman, true, true) 440 439 if err != nil { ··· 527 526 go func() { 528 527 rsc := &events.RepoStreamCallbacks{ 529 528 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 530 - fmt.Println("received event: ", evt.Seq, evt.Repo) 531 529 es.Lk.Lock() 532 530 es.Events = append(es.Events, &events.XRPCStreamEvent{RepoCommit: evt}) 531 + fmt.Println("received event: ", evt.Seq, evt.Repo, len(es.Events)) 533 532 es.Lk.Unlock() 534 533 return nil 535 534 },