this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at feat/repo-parse-allocs 578 lines 14 kB view raw
1package testing 2 3import ( 4 "bytes" 5 "context" 6 "math/rand" 7 "strings" 8 "testing" 9 "time" 10 11 atproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/events" 13 "github.com/bluesky-social/indigo/repo" 14 "github.com/bluesky-social/indigo/xrpc" 15 "github.com/ipfs/go-cid" 16 car "github.com/ipld/go-car" 17 "github.com/stretchr/testify/assert" 18) 19 20func TestRelayBasic(t *testing.T) { 21 t.Helper() 22 testRelayBasic(t, true) 23} 24 25func TestRelayBasicNonArchive(t *testing.T) { 26 t.Helper() 27 testRelayBasic(t, false) 28} 29 30func testRelayBasic(t *testing.T, archive bool) { 31 if testing.Short() { 32 t.Skip("skipping Relay test in 'short' test mode") 33 } 34 assert := assert.New(t) 35 didr := TestPLC(t) 36 p1 := MustSetupPDS(t, ".tpds", didr) 37 p1.Run(t) 38 39 b1 := MustSetupRelay(t, didr, archive) 40 b1.Run(t) 41 42 b1.tr.TrialHosts = []string{p1.RawHost()} 43 44 p1.RequestScraping(t, b1) 45 p1.BumpLimits(t, b1) 46 47 time.Sleep(time.Millisecond * 50) 48 49 evts := b1.Events(t, -1) 50 defer evts.Cancel() 51 52 bob := p1.MustNewUser(t, "bob.tpds") 53 t.Log("event 1") 54 e1 := evts.Next() 55 assert.NotNil(e1.RepoCommit) 56 assert.Equal(e1.RepoCommit.Repo, bob.DID()) 57 58 alice := p1.MustNewUser(t, "alice.tpds") 59 t.Log("event 2") 60 e2 := evts.Next() 61 assert.NotNil(e2.RepoCommit) 62 assert.Equal(e2.RepoCommit.Repo, alice.DID()) 63 64 bp1 := bob.Post(t, "cats for cats") 65 ap1 := alice.Post(t, "no i like dogs") 66 67 _ = bp1 68 _ = ap1 69 70 t.Log("bob:", bob.DID()) 71 t.Log("event 3") 72 e3 := evts.Next() 73 assert.Equal(e3.RepoCommit.Repo, bob.DID()) 74 //assert.Equal(e3.RepoCommit.Ops[0].Kind, "createRecord") 75 76 t.Log("alice:", alice.DID()) 77 t.Log("event 4") 78 e4 := evts.Next() 79 assert.Equal(e4.RepoCommit.Repo, alice.DID()) 80 //assert.Equal(e4.RepoCommit.Ops[0].Kind, "createRecord") 81 82 // playback 83 pbevts := b1.Events(t, 2) 84 defer pbevts.Cancel() 85 86 t.Log("event 5") 87 pbe1 := pbevts.Next() 88 assert.Equal(*e3, *pbe1) 89} 90 91func randomFollows(t *testing.T, users []*TestUser) { 92 for n := 0; n < 3; n++ { 93 for i, u := range users { 94 oi := rand.Intn(len(users)) 95 if i == oi { 96 continue 97 } 98 99 u.Follow(t, users[oi].DID()) 100 } 101 } 102} 103 104func socialSim(t *testing.T, users []*TestUser, postiter, likeiter int) []*atproto.RepoStrongRef { 105 var posts []*atproto.RepoStrongRef 106 for i := 0; i < postiter; i++ { 107 for _, u := range users { 108 posts = append(posts, u.Post(t, MakeRandomPost())) 109 } 110 } 111 112 for i := 0; i < likeiter; i++ { 113 for _, u := range users { 114 u.Like(t, posts[rand.Intn(len(posts))]) 115 } 116 } 117 118 return posts 119} 120 121func TestRelayMultiPDS(t *testing.T) { 122 t.Helper() 123 testRelayMultiPDS(t, true) 124} 125 126func TestRelayMultiPDSNonArchive(t *testing.T) { 127 t.Helper() 128 testRelayMultiPDS(t, false) 129} 130 131func testRelayMultiPDS(t *testing.T, archive bool) { 132 if testing.Short() { 133 t.Skip("skipping Relay test in 'short' test mode") 134 } 135 //t.Skip("test too sleepy to run in CI for now") 136 137 assert := assert.New(t) 138 _ = assert 139 didr := TestPLC(t) 140 p1 := MustSetupPDS(t, ".pdsuno", didr) 141 p1.Run(t) 142 143 p2 := MustSetupPDS(t, ".pdsdos", didr) 144 p2.Run(t) 145 146 b1 := MustSetupRelay(t, didr, archive) 147 b1.Run(t) 148 149 b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()} 150 151 p1.RequestScraping(t, b1) 152 p1.BumpLimits(t, b1) 153 time.Sleep(time.Millisecond * 100) 154 155 var users []*TestUser 156 for i := 0; i < 5; i++ { 157 users = append(users, p1.MustNewUser(t, usernames[i]+".pdsuno")) 158 } 159 160 randomFollows(t, users) 161 socialSim(t, users, 10, 10) 162 163 var users2 []*TestUser 164 for i := 0; i < 5; i++ { 165 users2 = append(users2, p2.MustNewUser(t, usernames[i+5]+".pdsdos")) 166 } 167 168 randomFollows(t, users2) 169 p2posts := socialSim(t, users2, 10, 10) 170 171 randomFollows(t, append(users, users2...)) 172 173 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life") 174 175 // now if we make posts on pds 2, the relay will not hear about those new posts 176 177 p2posts2 := socialSim(t, users2, 10, 10) 178 179 time.Sleep(time.Second) 180 181 p2.RequestScraping(t, b1) 182 p2.BumpLimits(t, b1) 183 time.Sleep(time.Millisecond * 50) 184 185 // Now, the relay will discover a gap, and have to catch up somehow 186 socialSim(t, users2, 1, 0) 187 188 // we expect the relay to learn about posts that it did not directly see from 189 // repos its already partially scraped, as long as its seen *something* after the missing post 190 // this is the 'catchup' process 191 _ = p2posts2 192 /* NOTE: BGS doesn't support indexing any more 193 time.Sleep(time.Second) 194 ctx := context.Background() 195 _, err := b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri) 196 if err != nil { 197 t.Fatal(err) 198 } 199 */ 200} 201 202func TestRelayMultiGap(t *testing.T) { 203 if testing.Short() { 204 t.Skip("skipping Relay test in 'short' test mode") 205 } 206 //t.Skip("test too sleepy to run in CI for now") 207 assert := assert.New(t) 208 _ = assert 209 didr := TestPLC(t) 210 p1 := MustSetupPDS(t, ".pdsuno", didr) 211 p1.Run(t) 212 213 p2 := MustSetupPDS(t, ".pdsdos", didr) 214 p2.Run(t) 215 216 b1 := MustSetupRelay(t, didr, true) 217 b1.Run(t) 218 219 b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()} 220 221 p1.RequestScraping(t, b1) 222 p1.BumpLimits(t, b1) 223 time.Sleep(time.Millisecond * 250) 224 225 users := []*TestUser{p1.MustNewUser(t, usernames[0]+".pdsuno")} 226 227 socialSim(t, users, 10, 0) 228 229 users2 := []*TestUser{p2.MustNewUser(t, usernames[1]+".pdsdos")} 230 231 p2posts := socialSim(t, users2, 10, 0) 232 233 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life") 234 235 /* NOTE: BGS doesn't support indexing any more 236 time.Sleep(time.Second * 2) 237 ctx := context.Background() 238 _, err := b1.bgs.Index.GetPost(ctx, p2posts[3].Uri) 239 if err != nil { 240 t.Fatal(err) 241 } 242 */ 243 244 // now if we make posts on pds 2, the relay will not hear about those new posts 245 246 p2posts2 := socialSim(t, users2, 10, 0) 247 248 time.Sleep(time.Second) 249 250 p2.RequestScraping(t, b1) 251 p2.BumpLimits(t, b1) 252 time.Sleep(time.Second * 2) 253 254 // Now, the relay will discover a gap, and have to catch up somehow 255 socialSim(t, users2, 1, 0) 256 257 // we expect the relay to learn about posts that it did not directly see from 258 // repos its already partially scraped, as long as its seen *something* after the missing post 259 // this is the 'catchup' process 260 _ = p2posts2 261 /* NOTE: BGS doesn't support indexing any more 262 time.Sleep(time.Second * 2) 263 _, err = b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri) 264 if err != nil { 265 t.Fatal(err) 266 } 267 */ 268} 269 270func TestHandleChange(t *testing.T) { 271 //t.Skip("test too sleepy to run in CI for now") 272 assert := assert.New(t) 273 _ = assert 274 didr := TestPLC(t) 275 p1 := MustSetupPDS(t, ".pdsuno", didr) 276 p1.Run(t) 277 278 b1 := MustSetupRelay(t, didr, true) 279 b1.Run(t) 280 281 b1.tr.TrialHosts = []string{p1.RawHost()} 282 283 p1.RequestScraping(t, b1) 284 p1.BumpLimits(t, b1) 285 time.Sleep(time.Millisecond * 50) 286 287 evts := b1.Events(t, -1) 288 289 u := p1.MustNewUser(t, usernames[0]+".pdsuno") 290 291 // if the handle changes before the relay processes the first event, things 292 // get a little weird 293 time.Sleep(time.Millisecond * 50) 294 //socialSim(t, []*testUser{u}, 10, 0) 295 296 u.ChangeHandle(t, "catbear.pdsuno") 297 298 time.Sleep(time.Millisecond * 100) 299 300 initevt := evts.Next() 301 t.Log(initevt.RepoCommit) 302 hcevt := evts.Next() 303 t.Log(hcevt.RepoHandle) 304 idevt := evts.Next() 305 t.Log(idevt.RepoIdentity) 306} 307 308func TestAccountEvent(t *testing.T) { 309 assert := assert.New(t) 310 _ = assert 311 didr := TestPLC(t) 312 p1 := MustSetupPDS(t, ".pdsuno", didr) 313 p1.Run(t) 314 315 b1 := MustSetupRelay(t, didr, true) 316 b1.Run(t) 317 318 b1.tr.TrialHosts = []string{p1.RawHost()} 319 320 p1.RequestScraping(t, b1) 321 p1.BumpLimits(t, b1) 322 time.Sleep(time.Millisecond * 50) 323 324 evts := b1.Events(t, -1) 325 326 u := p1.MustNewUser(t, usernames[0]+".pdsuno") 327 328 // if the handle changes before the relay processes the first event, things 329 // get a little weird 330 time.Sleep(time.Millisecond * 50) 331 //socialSim(t, []*testUser{u}, 10, 0) 332 333 p1.TakedownRepo(t, u.DID()) 334 p1.ReactivateRepo(t, u.DID()) 335 p1.DeactivateRepo(t, u.DID()) 336 p1.ReactivateRepo(t, u.DID()) 337 p1.SuspendRepo(t, u.DID()) 338 p1.ReactivateRepo(t, u.DID()) 339 340 time.Sleep(time.Millisecond * 100) 341 342 initevt := evts.Next() 343 t.Log(initevt.RepoCommit) 344 345 // Takedown 346 acevt := evts.Next() 347 t.Log(acevt.RepoAccount) 348 assert.Equal(acevt.RepoAccount.Did, u.DID()) 349 assert.Equal(acevt.RepoAccount.Active, false) 350 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown) 351 352 // Reactivate 353 acevt = evts.Next() 354 t.Log(acevt.RepoAccount) 355 assert.Equal(acevt.RepoAccount.Did, u.DID()) 356 assert.Equal(acevt.RepoAccount.Active, true) 357 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 358 359 // Deactivate 360 acevt = evts.Next() 361 t.Log(acevt.RepoAccount) 362 assert.Equal(acevt.RepoAccount.Did, u.DID()) 363 assert.Equal(acevt.RepoAccount.Active, false) 364 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusDeactivated) 365 366 // Reactivate 367 acevt = evts.Next() 368 t.Log(acevt.RepoAccount) 369 assert.Equal(acevt.RepoAccount.Did, u.DID()) 370 assert.Equal(acevt.RepoAccount.Active, true) 371 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 372 373 // Suspend 374 acevt = evts.Next() 375 t.Log(acevt.RepoAccount) 376 assert.Equal(acevt.RepoAccount.Did, u.DID()) 377 assert.Equal(acevt.RepoAccount.Active, false) 378 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusSuspended) 379 380 // Reactivate 381 acevt = evts.Next() 382 t.Log(acevt.RepoAccount) 383 assert.Equal(acevt.RepoAccount.Did, u.DID()) 384 assert.Equal(acevt.RepoAccount.Active, true) 385 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 386 387 // Takedown at Relay level, then emit active event and make sure relay overrides it 388 b1.bgs.TakeDownRepo(context.TODO(), u.DID()) 389 p1.ReactivateRepo(t, u.DID()) 390 391 time.Sleep(time.Millisecond * 20) 392 393 acevt = evts.Next() 394 t.Log(acevt.RepoAccount) 395 assert.Equal(acevt.RepoAccount.Did, u.DID()) 396 assert.Equal(acevt.RepoAccount.Active, false) 397 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown) 398 399 // Reactivate at Relay level, then emit an active account event and make sure relay passes it through 400 b1.bgs.ReverseTakedown(context.TODO(), u.DID()) 401 p1.ReactivateRepo(t, u.DID()) 402 403 time.Sleep(time.Millisecond * 20) 404 405 acevt = evts.Next() 406 t.Log(acevt.RepoAccount) 407 assert.Equal(acevt.RepoAccount.Did, u.DID()) 408 assert.Equal(acevt.RepoAccount.Active, true) 409 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive) 410} 411 412func TestRelayTakedown(t *testing.T) { 413 testRelayTakedown(t, true) 414} 415 416func TestRelayTakedownNonArchive(t *testing.T) { 417 testRelayTakedown(t, false) 418} 419 420func testRelayTakedown(t *testing.T, archive bool) { 421 if testing.Short() { 422 t.Skip("skipping Relay test in 'short' test mode") 423 } 424 assert := assert.New(t) 425 _ = assert 426 427 didr := TestPLC(t) 428 p1 := MustSetupPDS(t, ".tpds", didr) 429 p1.Run(t) 430 431 b1 := MustSetupRelay(t, didr, true) 432 b1.Run(t) 433 434 b1.tr.TrialHosts = []string{p1.RawHost()} 435 436 p1.RequestScraping(t, b1) 437 p1.BumpLimits(t, b1) 438 439 time.Sleep(time.Millisecond * 50) 440 es1 := b1.Events(t, 0) 441 442 bob := p1.MustNewUser(t, "bob.tpds") 443 alice := p1.MustNewUser(t, "alice.tpds") 444 445 bob.Post(t, "cats for cats") 446 alice.Post(t, "no i like dogs") 447 bp2 := bob.Post(t, "im a bad person who deserves to be taken down") 448 bob.Like(t, bp2) 449 450 expCount := 6 451 evts1 := es1.WaitFor(expCount) 452 assert.Equal(expCount, len(evts1)) 453 454 assert.NoError(b1.bgs.TakeDownRepo(context.TODO(), bob.did)) 455 456 es2 := b1.Events(t, 0) 457 time.Sleep(time.Millisecond * 50) // wait for events to stream in and be collected 458 evts2 := es2.WaitFor(2) 459 460 assert.Equal(2, len(evts2)) 461 for _, e := range evts2 { 462 if e.RepoCommit.Repo == bob.did { 463 t.Fatal("events from bob were not removed") 464 } 465 } 466 467 bob.Post(t, "im gonna sneak through being banned") 468 time.Sleep(time.Millisecond * 50) 469 alice.Post(t, "im a normal person") 470 // ensure events from bob dont get through 471 472 last := es2.Next() 473 assert.Equal(alice.did, last.RepoCommit.Repo) 474} 475 476func commitFromSlice(t *testing.T, slice []byte, rcid cid.Cid) *repo.SignedCommit { 477 carr, err := car.NewCarReader(bytes.NewReader(slice)) 478 if err != nil { 479 t.Fatal(err) 480 } 481 482 for { 483 blk, err := carr.Next() 484 if err != nil { 485 t.Fatal(err) 486 } 487 488 if blk.Cid() == rcid { 489 490 var sc repo.SignedCommit 491 if err := sc.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil { 492 t.Fatal(err) 493 } 494 return &sc 495 } 496 } 497} 498 499func TestDomainBans(t *testing.T) { 500 if testing.Short() { 501 t.Skip("skipping Relay test in 'short' test mode") 502 } 503 didr := TestPLC(t) 504 505 b1 := MustSetupRelay(t, didr, true) 506 b1.Run(t) 507 508 b1.BanDomain(t, "foo.com") 509 510 c := &xrpc.Client{Host: "http://" + b1.Host()} 511 if err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "foo.com"}); err == nil { 512 t.Fatal("domain should be banned") 513 } 514 515 if err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "pds.foo.com"}); err == nil { 516 t.Fatal("domain should be banned") 517 } 518 519 err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "app.pds.foo.com"}) 520 if err == nil { 521 t.Fatal("domain should be banned") 522 } 523 524 if !strings.Contains(err.Error(), "XRPC ERROR 401") { 525 t.Fatal("should have failed with a 401") 526 } 527 528 // should not be banned 529 err = atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "foo.bar.com"}) 530 if err == nil { 531 t.Fatal("should still fail") 532 } 533 534 if !strings.Contains(err.Error(), "XRPC ERROR 400") { 535 t.Fatal("should have failed with a 400") 536 } 537} 538 539func TestRelayHandleEmptyEvent(t *testing.T) { 540 if testing.Short() { 541 t.Skip("skipping Relay test in 'short' test mode") 542 } 543 assert := assert.New(t) 544 didr := TestPLC(t) 545 p1 := MustSetupPDS(t, ".tpds", didr) 546 p1.Run(t) 547 548 b1 := MustSetupRelay(t, didr, true) 549 b1.Run(t) 550 551 b1.tr.TrialHosts = []string{p1.RawHost()} 552 553 p1.RequestScraping(t, b1) 554 p1.BumpLimits(t, b1) 555 556 time.Sleep(time.Millisecond * 50) 557 558 evts := b1.Events(t, -1) 559 defer evts.Cancel() 560 561 bob := p1.MustNewUser(t, "bob.tpds") 562 t.Log("event 1") 563 e1 := evts.Next() 564 assert.NotNil(e1.RepoCommit) 565 assert.Equal(e1.RepoCommit.Repo, bob.DID()) 566 t.Log(e1.RepoCommit.Ops[0]) 567 568 ctx := context.TODO() 569 rm := p1.server.Repoman() 570 if err := rm.BatchWrite(ctx, 1, nil); err != nil { 571 t.Fatal(err) 572 } 573 574 e2 := evts.Next() 575 //t.Log(e2.RepoCommit.Ops[0]) 576 assert.Equal(len(e2.RepoCommit.Ops), 0) 577 assert.Equal(e2.RepoCommit.Repo, bob.DID()) 578}