this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement repo takedowns (#161)

Adds a basic admin route to fully take down a given repo, including
marking the user as 'taken down' and fully deleting all associated data.

authored by

Whyrusleeping and committed by
GitHub
da6d879e 696c2527

+225 -23
+13 -15
bgs/admin.go
··· 3 3 import ( 4 4 "strconv" 5 5 6 - "github.com/bluesky-social/indigo/util" 7 6 "github.com/labstack/echo/v4" 8 7 ) 9 8 10 - func (bgs *BGS) handleAdminDeleteRecord(e echo.Context) error { 11 - puri, err := util.ParseAtUri(e.QueryParam("uri")) 12 - if err != nil { 13 - return err 14 - } 15 - 16 - _ = puri 17 - 18 - panic("TODO") 19 - } 20 - 21 9 func (bgs *BGS) handleAdminBlockRepoStream(e echo.Context) error { 22 10 panic("TODO") 23 11 } 24 12 25 - func (bgs *BGS) handleAdminDisableNewSlurps(e echo.Context) error { 13 + func (bgs *BGS) handleAdminSetSubsEnabled(e echo.Context) error { 26 14 enabled, err := strconv.ParseBool(e.QueryParam("enabled")) 27 15 if err != nil { 28 16 return err ··· 31 19 return bgs.slurper.SetNewSubsDisabled(!enabled) 32 20 } 33 21 34 - func (bgs *BGS) handleAdminTakedownRepo(e echo.Context) error { 35 - panic("TODO") 22 + func (bgs *BGS) handleAdminTakeDownRepo(e echo.Context) error { 23 + did := e.QueryParam("did") 24 + ctx := e.Request().Context() 25 + 26 + return bgs.TakeDownRepo(ctx, did) 27 + } 28 + 29 + func (bgs *BGS) handleAdminReverseTakedown(e echo.Context) error { 30 + did := e.QueryParam("did") 31 + ctx := e.Request().Context() 32 + 33 + return bgs.ReverseTakedown(ctx, did) 36 34 }
+47 -1
bgs/bgs.go
··· 198 198 e.GET("/xrpc/_health", bgs.HandleHealthCheck) 199 199 200 200 admin := e.Group("/admin", bgs.checkAdminAuth) 201 - admin.POST("/deleteRecord", bgs.handleAdminDeleteRecord) 201 + admin.POST("/subs/setEnabled", bgs.handleAdminSetSubsEnabled) 202 + admin.POST("/repo/takeDown", bgs.handleAdminTakeDownRepo) 203 + admin.POST("/repo/reverseTakedown", bgs.handleAdminReverseTakedown) 202 204 203 205 return e.Start(listen) 204 206 } ··· 281 283 Handle string `gorm:"uniqueIndex"` 282 284 Did string `gorm:"uniqueIndex"` 283 285 PDS uint 286 + 287 + // TakenDown is set to true if the user in question has been taken down. 288 + // A user in this state will have all future events related to it dropped 289 + // and no data about this user will be served. 290 + TakenDown bool 284 291 } 285 292 286 293 type addTargetBody struct { ··· 427 434 u.Did = evt.Repo 428 435 } 429 436 437 + if u.TakenDown { 438 + log.Infow("dropping event from taken down user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host) 439 + return nil 440 + } 441 + 430 442 // TODO: if the user is already in the 'slow' path, we shouldnt even bother trying to fast path this event 431 443 432 444 if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, (*cid.Cid)(evt.Prev), evt.Blocks); err != nil { ··· 661 673 662 674 return subj, nil 663 675 } 676 + 677 + func (bgs *BGS) TakeDownRepo(ctx context.Context, did string) error { 678 + u, err := bgs.lookupUserByDid(ctx, did) 679 + if err != nil { 680 + return err 681 + } 682 + 683 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", true).Error; err != nil { 684 + return err 685 + } 686 + 687 + if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil { 688 + return err 689 + } 690 + 691 + if err := bgs.events.TakeDownRepo(ctx, u.ID); err != nil { 692 + return err 693 + } 694 + 695 + return nil 696 + } 697 + 698 + func (bgs *BGS) ReverseTakedown(ctx context.Context, did string) error { 699 + u, err := bgs.lookupUserByDid(ctx, did) 700 + if err != nil { 701 + return err 702 + } 703 + 704 + if err := bgs.db.Model(User{}).Where("id = ?", u.ID).Update("taken_down", false).Error; err != nil { 705 + return err 706 + } 707 + 708 + return nil 709 + }
+32 -4
carstore/bs.go
··· 484 484 return ds.base.GetSize(ctx, c) 485 485 } 486 486 487 + func fnameForShard(user util.Uid, seq int) string { 488 + return fmt.Sprintf("sh-%d-%d", user, seq) 489 + } 487 490 func (cs *CarStore) openNewShardFile(ctx context.Context, user util.Uid, seq int) (*os.File, string, error) { 488 491 // TODO: some overwrite protections 489 - fname := filepath.Join(cs.rootDir, fmt.Sprintf("sh-%d-%d", user, seq)) 492 + fname := filepath.Join(cs.rootDir, fnameForShard(user, seq)) 490 493 fi, err := os.Create(fname) 491 494 if err != nil { 492 495 return nil, "", err ··· 497 500 498 501 func (cs *CarStore) writeNewShardFile(ctx context.Context, user util.Uid, seq int, data []byte) (string, error) { 499 502 // TODO: some overwrite protections 500 - fname := filepath.Join(cs.rootDir, fmt.Sprintf("sh-%d-%d", user, seq)) 503 + fname := filepath.Join(cs.rootDir, fnameForShard(user, seq)) 501 504 if err := os.WriteFile(fname, data, 0664); err != nil { 502 505 return "", err 503 506 } 504 507 505 508 return fname, nil 509 + } 510 + 511 + func (cs *CarStore) deleteShardFile(ctx context.Context, sh *CarShard) error { 512 + return os.Remove(fnameForShard(sh.Usr, sh.Seq)) 506 513 } 507 514 508 515 // CloseWithRoot writes all new blocks in a car file to the writer with the ··· 701 708 return false, err 702 709 } 703 710 704 - if maybeShard.ID == lastShard.ID { 711 + if maybeShard.ID != 0 && maybeShard.ID == lastShard.ID { 705 712 // somehow we are checking if a valid 'append' is a fork, seems buggy, throw an error 706 - return false, fmt.Errorf("invariant broken: checked for forkiness of a valid append") 713 + return false, fmt.Errorf("invariant broken: checked for forkiness of a valid append (%d - %d)", lastShard.ID, maybeShard.ID) 707 714 } 708 715 709 716 if maybeShard.ID == 0 { ··· 712 719 713 720 return true, nil 714 721 } 722 + 723 + func (cs *CarStore) TakeDownRepo(ctx context.Context, user util.Uid) error { 724 + var shards []CarShard 725 + if err := cs.meta.Find(&shards, "usr = ?", user).Error; err != nil { 726 + return err 727 + } 728 + 729 + for _, sh := range shards { 730 + if err := cs.deleteShardFile(ctx, &sh); err != nil { 731 + if !os.IsNotExist(err) { 732 + return err 733 + } 734 + } 735 + } 736 + 737 + if err := cs.meta.Delete(&CarShard{}, "usr = ?", user).Error; err != nil { 738 + return err 739 + } 740 + 741 + return nil 742 + }
+23
cmd/gosky/main.go
··· 1012 1012 return err 1013 1013 } 1014 1014 repob = rrb 1015 + } else if strings.HasPrefix(cctx.Args().First(), "at://") { 1016 + xrpcc, err := cliutil.GetXrpcClient(cctx, false) 1017 + if err != nil { 1018 + return err 1019 + } 1020 + 1021 + puri, err := util.ParseAtUri(cctx.Args().First()) 1022 + if err != nil { 1023 + return err 1024 + } 1025 + 1026 + out, err := comatproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey) 1027 + if err != nil { 1028 + return err 1029 + } 1030 + 1031 + b, err := json.MarshalIndent(out.Value.Val, "", " ") 1032 + if err != nil { 1033 + return err 1034 + } 1035 + 1036 + fmt.Println(string(b)) 1037 + return nil 1015 1038 } else { 1016 1039 fb, err := os.ReadFile(rfi) 1017 1040 if err != nil {
+21 -1
events/dbpersist.go
··· 147 147 148 148 ra, err := p.hydrateRepoEvent(ctx, &evt) 149 149 if err != nil { 150 - return err 150 + return fmt.Errorf("hydrating event: %w", err) 151 151 } 152 152 153 153 if err := cb(&XRPCStreamEvent{RepoCommit: ra}); err != nil { ··· 255 255 256 256 return buf.Bytes(), nil 257 257 } 258 + 259 + func (p *DbPersistence) TakeDownRepo(ctx context.Context, usr util.Uid) error { 260 + for { 261 + q := p.db.Model(&RepoEventRecord{}).Where("repo = ?", usr).Limit(100).Select("seq") 262 + res := p.db.Where("repo_event_record_id in (?)", q).Delete(&RepoOpRecord{}) 263 + if err := res.Error; err != nil { 264 + return fmt.Errorf("failed to delete repo op records: %w", err) 265 + } 266 + 267 + if res.RowsAffected == 0 { 268 + break 269 + } 270 + } 271 + 272 + if err := p.db.Where("repo = ?", usr).Delete(&RepoEventRecord{}).Error; err != nil { 273 + return err 274 + } 275 + 276 + return nil 277 + }
+4
events/events.go
··· 217 217 218 218 return sub.outgoing, cleanup, nil 219 219 } 220 + 221 + func (em *EventManager) TakeDownRepo(ctx context.Context, user util.Uid) error { 222 + return em.persister.TakeDownRepo(ctx, user) 223 + }
+8
events/persist.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "fmt" 5 6 "sync" 7 + 8 + "github.com/bluesky-social/indigo/util" 6 9 ) 7 10 8 11 // Note that this interface looks generic, but some persisters might only work with RepoAppend or LabelLabels 9 12 type EventPersistence interface { 10 13 Persist(ctx context.Context, e *XRPCStreamEvent) error 11 14 Playback(ctx context.Context, since int64, cb func(*XRPCStreamEvent) error) error 15 + TakeDownRepo(ctx context.Context, usr util.Uid) error 12 16 } 13 17 14 18 // MemPersister is the most naive implementation of event persistence ··· 65 69 66 70 return nil 67 71 } 72 + 73 + func (mp *MemPersister) TakeDownRepo(ctx context.Context, uid util.Uid) error { 74 + return fmt.Errorf("repo takedowns not currently supported by memory persister, test usage only") 75 + }
+1 -1
indexer/indexer.go
··· 373 373 case *bsky.ActorProfile: 374 374 return nil 375 375 default: 376 - log.Warnf("unrecognized record type: %T", rec) 376 + log.Warnf("unrecognized record type: %T", op.Record) 377 377 return nil 378 378 } 379 379 }
+7
repomgr/repomgr.go
··· 1093 1093 1094 1094 return out, nil 1095 1095 } 1096 + 1097 + func (rm *RepoManager) TakeDownRepo(ctx context.Context, uid util.Uid) error { 1098 + unlock := rm.lockUser(ctx, uid) 1099 + defer unlock() 1100 + 1101 + return rm.cs.TakeDownRepo(ctx, uid) 1102 + }
+53
testing/integ_test.go
··· 260 260 hcevt := evts.Next() 261 261 fmt.Println(hcevt.RepoHandle) 262 262 } 263 + 264 + func TestBGSTakedown(t *testing.T) { 265 + if testing.Short() { 266 + t.Skip("skipping BGS test in 'short' test mode") 267 + } 268 + assert := assert.New(t) 269 + _ = assert 270 + 271 + didr := testPLC(t) 272 + p1 := mustSetupPDS(t, "localhost:5151", ".tpds", didr) 273 + p1.Run(t) 274 + 275 + b1 := mustSetupBGS(t, "localhost:3231", didr) 276 + b1.Run(t) 277 + 278 + p1.RequestScraping(t, b1) 279 + 280 + time.Sleep(time.Millisecond * 50) 281 + es1 := b1.Events(t, 0) 282 + 283 + bob := p1.MustNewUser(t, "bob.tpds") 284 + alice := p1.MustNewUser(t, "alice.tpds") 285 + 286 + bob.Post(t, "cats for cats") 287 + alice.Post(t, "no i like dogs") 288 + bp2 := bob.Post(t, "im a bad person who deserves to be taken down") 289 + bob.Like(t, bp2) 290 + 291 + expCount := 6 292 + evts1 := es1.WaitFor(expCount) 293 + assert.Equal(expCount, len(evts1)) 294 + 295 + assert.NoError(b1.bgs.TakeDownRepo(context.TODO(), bob.did)) 296 + 297 + es2 := b1.Events(t, 0) 298 + time.Sleep(time.Millisecond * 50) // wait for events to stream in and be collected 299 + evts2 := es2.WaitFor(2) 300 + 301 + assert.Equal(2, len(evts2)) 302 + for _, e := range evts2 { 303 + if e.RepoCommit.Repo == bob.did { 304 + t.Fatal("events from bob were not removed") 305 + } 306 + } 307 + 308 + bob.Post(t, "im gonna sneak through being banned") 309 + time.Sleep(time.Millisecond * 50) 310 + alice.Post(t, "im a normal person") 311 + // ensure events from bob dont get through 312 + 313 + last := es2.Next() 314 + assert.Equal(alice.did, last.RepoCommit.Repo) 315 + }
+15 -1
testing/utils.go
··· 377 377 378 378 notifman := notifs.NewNotificationManager(maindb, repoman.GetRecord) 379 379 380 - evtman := events.NewEventManager(events.NewMemPersister()) 380 + dbpersist, err := events.NewDbPersistence(maindb, cs) 381 + if err != nil { 382 + return nil, err 383 + } 384 + 385 + evtman := events.NewEventManager(dbpersist) 381 386 382 387 go evtman.Run() 383 388 ··· 492 497 out := make([]*events.XRPCStreamEvent, len(es.events)) 493 498 for i, e := range es.events { 494 499 out[i] = e 500 + } 501 + 502 + return out 503 + } 504 + 505 + func (es *eventStream) WaitFor(n int) []*events.XRPCStreamEvent { 506 + var out []*events.XRPCStreamEvent 507 + for i := 0; i < n; i++ { 508 + out = append(out, es.Next()) 495 509 } 496 510 497 511 return out
+1
xrpc/xrpc.go
··· 121 121 if resp.StatusCode != 200 { 122 122 var i interface{} 123 123 _ = json.NewDecoder(resp.Body).Decode(&i) 124 + //fmt.Println(i) 124 125 return fmt.Errorf("XRPC ERROR %d: %s", resp.StatusCode, resp.Status) 125 126 } 126 127