this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement repo takedowns

+303 -7
+170 -7
events/diskpersist.go
··· 58 58 Seq int64 59 59 } 60 60 61 + const ( 62 + EvtFlagTakedown = 1 << iota 63 + ) 64 + 61 65 var _ (EventPersistence) = (*DiskPersistence)(nil) 62 66 63 67 type DiskPersistOptions struct { ··· 254 258 255 259 var emptyHeader = make([]byte, headerSize) 256 260 261 + func (p *DiskPersistence) addJobsToQueue(jobs []persistJob) error { 262 + p.lk.Lock() 263 + defer p.lk.Unlock() 264 + 265 + for _, job := range jobs { 266 + if err := p.doPersist(job); err != nil { 267 + return err 268 + } 269 + 270 + // TODO: for some reason replacing this constant with p.writeBufferSize dramatically reduces perf... 271 + if len(p.evtbuf) > 400 { 272 + if err := p.flushLog(context.TODO()); err != nil { 273 + return fmt.Errorf("failed to flush disk log: %w", err) 274 + } 275 + } 276 + } 277 + 278 + return nil 279 + } 280 + 257 281 func (p *DiskPersistence) addJobToQueue(job persistJob) error { 258 282 p.lk.Lock() 259 283 defer p.lk.Unlock() ··· 302 326 303 327 for _, ej := range p.evtbuf { 304 328 p.broadcast(ej.Evt) 329 + ej.Buffer.Truncate(0) 305 330 p.buffers.Put(ej.Buffer) 306 331 } 307 332 ··· 316 341 seq := p.curSeq 317 342 p.curSeq++ 318 343 319 - binary.LittleEndian.PutUint64(b[12:], uint64(seq)) 344 + binary.LittleEndian.PutUint64(b[20:], uint64(seq)) 320 345 321 346 switch { 322 347 case e.RepoCommit != nil: ··· 358 383 359 384 buffer.Write(emptyHeader) 360 385 386 + var did string 361 387 var evtKind uint32 362 388 switch { 363 389 case e.RepoCommit != nil: 364 390 evtKind = evtKindCommit 391 + did = e.RepoCommit.Repo 365 392 if err := e.RepoCommit.MarshalCBOR(buffer); err != nil { 366 393 return fmt.Errorf("failed to marshal: %w", err) 367 394 } 368 395 case e.RepoHandle != nil: 369 396 evtKind = evtKindHandle 397 + did = e.RepoHandle.Did 370 398 if err := e.RepoHandle.MarshalCBOR(buffer); err != nil { 371 399 return fmt.Errorf("failed to marshal: %w", err) 372 400 } ··· 375 403 // only those two get peristed right now 376 404 } 377 405 406 + usr, err := p.uidForDid(ctx, did) 407 + if err != nil { 408 + return err 409 + } 410 + 378 411 b := buffer.Bytes() 379 412 380 413 binary.LittleEndian.PutUint32(b, 0) 381 414 binary.LittleEndian.PutUint32(b[4:], evtKind) 382 415 binary.LittleEndian.PutUint32(b[8:], uint32(len(b)-headerSize)) 416 + binary.LittleEndian.PutUint64(b[12:], uint64(usr)) 383 417 384 418 return p.addJobToQueue(persistJob{ 385 419 Buf: b, ··· 392 426 Flags uint32 393 427 Kind uint32 394 428 Seq int64 429 + Usr models.Uid 395 430 Len int64 396 431 } 397 432 398 - const headerSize = 4 + 4 + 4 + 8 433 + const headerSize = 4 + 4 + 4 + 8 + 8 399 434 400 435 func readHeader(r io.Reader, scratch []byte) (*evtHeader, error) { 401 436 if len(scratch) < headerSize { ··· 411 446 flags := binary.LittleEndian.Uint32(scratch[:4]) 412 447 kind := binary.LittleEndian.Uint32(scratch[4:8]) 413 448 l := binary.LittleEndian.Uint32(scratch[8:12]) 414 - seq := binary.LittleEndian.Uint64(scratch[12:20]) 449 + usr := binary.LittleEndian.Uint64(scratch[12:20]) 450 + seq := binary.LittleEndian.Uint64(scratch[20:28]) 415 451 416 452 return &evtHeader{ 417 453 Flags: flags, 418 454 Kind: kind, 419 455 Len: int64(l), 456 + Usr: models.Uid(usr), 420 457 Seq: int64(seq), 421 458 }, nil 422 459 } 423 460 424 - func (p *DiskPersistence) writeHeader(ctx context.Context, flags uint32, kind uint32, l uint32, seq int64) error { 461 + func (p *DiskPersistence) writeHeader(ctx context.Context, flags uint32, kind uint32, l uint32, usr uint64, seq int64) error { 425 462 binary.LittleEndian.PutUint32(p.scratch, flags) 426 463 binary.LittleEndian.PutUint32(p.scratch[4:], kind) 427 464 binary.LittleEndian.PutUint32(p.scratch[8:], l) 428 - binary.LittleEndian.PutUint64(p.scratch[12:], uint64(seq)) 465 + binary.LittleEndian.PutUint64(p.scratch[12:], usr) 466 + binary.LittleEndian.PutUint64(p.scratch[20:], uint64(seq)) 429 467 430 468 nw, err := p.logfi.Write(p.scratch) 431 469 if err != nil { ··· 496 534 return err 497 535 } 498 536 537 + if h.Flags&EvtFlagTakedown != 0 { 538 + // event taken down, skip 539 + _, err := io.CopyN(io.Discard, bufr, h.Len) // would be really nice if the buffered reader had a 'skip' method that does a seek under the hood 540 + if err != nil { 541 + return fmt.Errorf("failed while skipping event (seq: %d, fn: %q): %w", h.Seq, fn, err) 542 + } 543 + } 544 + 499 545 switch h.Kind { 500 546 case evtKindCommit: 501 547 var evt atproto.SyncSubscribeRepos_Commit ··· 522 568 } 523 569 } 524 570 571 + type UserAction struct { 572 + gorm.Model 573 + 574 + Usr models.Uid 575 + RebaseAt int64 576 + Takedown bool 577 + } 578 + 525 579 func (p *DiskPersistence) TakeDownRepo(ctx context.Context, usr models.Uid) error { 526 - panic("no") 580 + /* 581 + if err := p.meta.Create(&UserAction{ 582 + Usr: usr, 583 + Takedown: true, 584 + }).Error; err != nil { 585 + return err 586 + } 587 + */ 588 + 589 + return p.forEachShardWithUserEvents(ctx, usr, func(ctx context.Context, fn string) error { 590 + if err := p.deleteEventsForUser(ctx, usr, fn); err != nil { 591 + return err 592 + } 593 + 594 + return nil 595 + }) 596 + } 597 + 598 + func (p *DiskPersistence) forEachShardWithUserEvents(ctx context.Context, usr models.Uid, cb func(context.Context, string) error) error { 599 + var refs []LogFileRef 600 + if err := p.meta.Order("created_at desc").Find(&refs).Error; err != nil { 601 + return err 602 + } 603 + 604 + for _, r := range refs { 605 + mhas, err := p.refMaybeHasUserEvents(ctx, usr, r) 606 + if err != nil { 607 + return err 608 + } 609 + 610 + if mhas { 611 + var path string 612 + if r.Archived { 613 + path = filepath.Join(p.archiveDir, r.Path) 614 + } else { 615 + path = filepath.Join(p.primaryDir, r.Path) 616 + } 617 + 618 + if err := cb(ctx, path); err != nil { 619 + return err 620 + } 621 + } 622 + } 623 + 624 + return nil 625 + } 626 + 627 + func (p *DiskPersistence) refMaybeHasUserEvents(ctx context.Context, usr models.Uid, ref LogFileRef) (bool, error) { 628 + // TODO: lazily computed bloom filters for users in each logfile 629 + return true, nil 630 + } 631 + 632 + type zeroReader struct{} 633 + 634 + func (zr *zeroReader) Read(p []byte) (n int, err error) { 635 + for i := range p { 636 + p[i] = 0 637 + } 638 + return len(p), nil 639 + } 640 + 641 + func (p *DiskPersistence) deleteEventsForUser(ctx context.Context, usr models.Uid, fn string) error { 642 + fi, err := os.OpenFile(fn, os.O_RDWR, 0) 643 + if err != nil { 644 + return fmt.Errorf("failed to open log file: %w", err) 645 + } 646 + defer fi.Close() 647 + 648 + scratch := make([]byte, headerSize) 649 + var offset int64 650 + for { 651 + h, err := readHeader(fi, scratch) 652 + if err != nil { 653 + if errors.Is(err, io.EOF) { 654 + return nil 655 + } 656 + 657 + return err 658 + } 659 + 660 + if h.Usr == usr && h.Flags&EvtFlagTakedown == 0 { 661 + nflag := h.Flags | EvtFlagTakedown 662 + 663 + binary.LittleEndian.PutUint32(scratch, nflag) 664 + 665 + if _, err := fi.WriteAt(scratch[:4], offset); err != nil { 666 + return fmt.Errorf("failed to write updated flag value: %w", err) 667 + } 668 + 669 + // sync that write before blanking the event data 670 + if err := fi.Sync(); err != nil { 671 + return err 672 + } 673 + 674 + if _, err := fi.Seek(offset+headerSize, io.SeekStart); err != nil { 675 + return fmt.Errorf("failed to seek: %w", err) 676 + } 677 + 678 + _, err := io.CopyN(fi, &zeroReader{}, h.Len) 679 + if err != nil { 680 + return err 681 + } 682 + } 683 + 684 + offset += headerSize + h.Len 685 + _, err = fi.Seek(offset, io.SeekStart) 686 + if err != nil { 687 + return fmt.Errorf("failed to seek: %w", err) 688 + } 689 + } 527 690 } 528 691 529 692 func (p *DiskPersistence) RebaseRepoEvents(ctx context.Context, usr models.Uid) error { 530 - panic("no") 693 + panic("todo") 531 694 } 532 695 533 696 func (p *DiskPersistence) Flush(ctx context.Context) error {
+114
events/diskpersist_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 "os" 6 7 "path/filepath" 7 8 "reflect" ··· 326 327 p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 327 328 // Check that the contents of the output events match the input events 328 329 if !reflect.DeepEqual(inEvts[outEvtCount], evt) { 330 + t.Logf("%v", inEvts[outEvtCount].RepoCommit) 331 + t.Logf("%v", evt.RepoCommit) 329 332 t.Fatalf("Event content mismatch: expected %+v, got %+v", inEvts[outEvtCount], evt) 330 333 } 331 334 outEvtCount++ ··· 336 339 t.Fatalf("expected %d events, got %d", testSize, outEvtCount) 337 340 } 338 341 } 342 + 343 + func TestDiskPersisterTakedowns(t *testing.T) { 344 + db, _, cs, tempPath, err := setupDBs(t) 345 + if err != nil { 346 + t.Fatal(err) 347 + } 348 + 349 + defer os.RemoveAll(tempPath) 350 + 351 + // Initialize a DBPersister 352 + 353 + dp, err := events.NewDiskPersistence(filepath.Join(tempPath, "diskPrimary"), filepath.Join(tempPath, "diskArchive"), db, &events.DiskPersistOptions{ 354 + EventsPerFile: 10, 355 + UIDCacheSize: 100000, 356 + DIDCacheSize: 100000, 357 + }) 358 + if err != nil { 359 + t.Fatal(err) 360 + } 361 + 362 + runTakedownTest(t, cs, db, dp) 363 + } 364 + 365 + func runTakedownTest(t *testing.T, cs *carstore.CarStore, db *gorm.DB, p events.EventPersistence) { 366 + ctx := context.TODO() 367 + 368 + db.AutoMigrate(&pds.User{}) 369 + db.AutoMigrate(&pds.Peering{}) 370 + db.AutoMigrate(&models.ActorInfo{}) 371 + 372 + mgr := repomgr.NewRepoManager(repomgr.NewDbHeadStore(db), cs, &util.FakeKeyManager{}) 373 + 374 + // Create multiple users 375 + userCount := 10 376 + users := make([]*models.ActorInfo, userCount) 377 + for i := models.Uid(1); i <= models.Uid(userCount); i++ { 378 + users[i-1] = &models.ActorInfo{ 379 + Uid: i, 380 + Did: fmt.Sprintf("did:example:%d", i), 381 + } 382 + db.Create(users[i-1]) 383 + 384 + err := mgr.InitNewActor(ctx, i, fmt.Sprintf("user%d", i), users[i-1].Did, fmt.Sprintf("User%d", i), "", "") 385 + if err != nil { 386 + t.Fatal(err) 387 + } 388 + } 389 + 390 + evtman := events.NewEventManager(p) 391 + 392 + testSize := 100 // you can adjust this number as needed 393 + inEvts := make([]*events.XRPCStreamEvent, testSize*userCount) 394 + for i := 0; i < testSize*userCount; i++ { 395 + user := users[i%userCount] 396 + _, cid, err := mgr.CreateRecord(ctx, user.Uid, "app.bsky.feed.post", &bsky.FeedPost{ 397 + Text: fmt.Sprintf("hello world from user %d", user.Uid), 398 + CreatedAt: time.Now().Format(util.ISO8601), 399 + }) 400 + if err != nil { 401 + t.Fatal(err) 402 + } 403 + 404 + userRepoHead, err := mgr.GetRepoRoot(ctx, user.Uid) 405 + if err != nil { 406 + t.Fatal(err) 407 + } 408 + 409 + cidLink := lexutil.LexLink(cid) 410 + headLink := lexutil.LexLink(userRepoHead) 411 + inEvts[i] = &events.XRPCStreamEvent{ 412 + RepoCommit: &atproto.SyncSubscribeRepos_Commit{ 413 + Repo: user.Did, 414 + Commit: headLink, 415 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{ 416 + { 417 + Action: "add", 418 + Cid: &cidLink, 419 + Path: "path1", 420 + }, 421 + }, 422 + Time: time.Now().Format(util.ISO8601), 423 + }, 424 + } 425 + 426 + err = evtman.AddEvent(ctx, inEvts[i]) 427 + if err != nil { 428 + t.Fatal(err) 429 + } 430 + } 431 + 432 + // Flush manually 433 + if err := p.Flush(ctx); err != nil { 434 + t.Fatal(err) 435 + } 436 + 437 + // Pick a user to take down 438 + takeDownUser := users[5] // For example, user with UID 6 (0-indexed) 439 + 440 + if err := evtman.TakeDownRepo(ctx, takeDownUser.Uid); err != nil { 441 + t.Fatal(err) 442 + } 443 + 444 + // Verify that the events of the user have been removed from the event stream 445 + var evtsCount int 446 + p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error { 447 + if evt.RepoCommit.Repo == takeDownUser.Did { 448 + t.Fatalf("found event for user %d after takedown", takeDownUser.Uid) 449 + } 450 + return nil 451 + }) 452 + }
+19
events/events.go
··· 78 78 } 79 79 } 80 80 81 + type batchPersister interface { 82 + PersistMany(ctx context.Context, evts []*XRPCStreamEvent) error 83 + } 84 + 85 + func (em *EventManager) persistAndSendEvents(ctx context.Context, evts []*XRPCStreamEvent) { 86 + pm := em.persister.(batchPersister) 87 + if err := pm.PersistMany(ctx, evts); err != nil { 88 + log.Errorf("failed to persist outbound events: %s", err) 89 + } 90 + } 91 + 81 92 type Subscriber struct { 82 93 outgoing chan *XRPCStreamEvent 83 94 ··· 122 133 defer span.End() 123 134 124 135 em.persistAndSendEvent(ctx, ev) 136 + return nil 137 + } 138 + 139 + func (em *EventManager) AddEventBatch(ctx context.Context, evs []*XRPCStreamEvent) error { 140 + ctx, span := otel.Tracer("events").Start(ctx, "AddEventBatch") 141 + defer span.End() 142 + 143 + em.persistAndSendEvents(ctx, evs) 125 144 return nil 126 145 } 127 146