this repo has no description
1package testing
2
3import (
4 "bytes"
5 "context"
6 "math/rand"
7 "strings"
8 "testing"
9 "time"
10
11 atproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/events"
13 "github.com/bluesky-social/indigo/repo"
14 "github.com/bluesky-social/indigo/xrpc"
15 "github.com/ipfs/go-cid"
16 car "github.com/ipld/go-car"
17 "github.com/stretchr/testify/assert"
18)
19
20func TestRelayBasic(t *testing.T) {
21 t.Helper()
22 testRelayBasic(t, true)
23}
24
25func TestRelayBasicNonArchive(t *testing.T) {
26 t.Helper()
27 testRelayBasic(t, false)
28}
29
30func testRelayBasic(t *testing.T, archive bool) {
31 if testing.Short() {
32 t.Skip("skipping Relay test in 'short' test mode")
33 }
34 assert := assert.New(t)
35 didr := TestPLC(t)
36 p1 := MustSetupPDS(t, ".tpds", didr)
37 p1.Run(t)
38
39 b1 := MustSetupRelay(t, didr, archive)
40 b1.Run(t)
41
42 b1.tr.TrialHosts = []string{p1.RawHost()}
43
44 p1.RequestScraping(t, b1)
45 p1.BumpLimits(t, b1)
46
47 time.Sleep(time.Millisecond * 50)
48
49 evts := b1.Events(t, -1)
50 defer evts.Cancel()
51
52 bob := p1.MustNewUser(t, "bob.tpds")
53 t.Log("event 1")
54 e1 := evts.Next()
55 assert.NotNil(e1.RepoCommit)
56 assert.Equal(e1.RepoCommit.Repo, bob.DID())
57
58 alice := p1.MustNewUser(t, "alice.tpds")
59 t.Log("event 2")
60 e2 := evts.Next()
61 assert.NotNil(e2.RepoCommit)
62 assert.Equal(e2.RepoCommit.Repo, alice.DID())
63
64 bp1 := bob.Post(t, "cats for cats")
65 ap1 := alice.Post(t, "no i like dogs")
66
67 _ = bp1
68 _ = ap1
69
70 t.Log("bob:", bob.DID())
71 t.Log("event 3")
72 e3 := evts.Next()
73 assert.Equal(e3.RepoCommit.Repo, bob.DID())
74 //assert.Equal(e3.RepoCommit.Ops[0].Kind, "createRecord")
75
76 t.Log("alice:", alice.DID())
77 t.Log("event 4")
78 e4 := evts.Next()
79 assert.Equal(e4.RepoCommit.Repo, alice.DID())
80 //assert.Equal(e4.RepoCommit.Ops[0].Kind, "createRecord")
81
82 // playback
83 pbevts := b1.Events(t, 2)
84 defer pbevts.Cancel()
85
86 t.Log("event 5")
87 pbe1 := pbevts.Next()
88 assert.Equal(*e3, *pbe1)
89}
90
91func randomFollows(t *testing.T, users []*TestUser) {
92 for n := 0; n < 3; n++ {
93 for i, u := range users {
94 oi := rand.Intn(len(users))
95 if i == oi {
96 continue
97 }
98
99 u.Follow(t, users[oi].DID())
100 }
101 }
102}
103
104func socialSim(t *testing.T, users []*TestUser, postiter, likeiter int) []*atproto.RepoStrongRef {
105 var posts []*atproto.RepoStrongRef
106 for i := 0; i < postiter; i++ {
107 for _, u := range users {
108 posts = append(posts, u.Post(t, MakeRandomPost()))
109 }
110 }
111
112 for i := 0; i < likeiter; i++ {
113 for _, u := range users {
114 u.Like(t, posts[rand.Intn(len(posts))])
115 }
116 }
117
118 return posts
119}
120
121func TestRelayMultiPDS(t *testing.T) {
122 t.Helper()
123 testRelayMultiPDS(t, true)
124}
125
126func TestRelayMultiPDSNonArchive(t *testing.T) {
127 t.Helper()
128 testRelayMultiPDS(t, false)
129}
130
131func testRelayMultiPDS(t *testing.T, archive bool) {
132 if testing.Short() {
133 t.Skip("skipping Relay test in 'short' test mode")
134 }
135 //t.Skip("test too sleepy to run in CI for now")
136
137 assert := assert.New(t)
138 _ = assert
139 didr := TestPLC(t)
140 p1 := MustSetupPDS(t, ".pdsuno", didr)
141 p1.Run(t)
142
143 p2 := MustSetupPDS(t, ".pdsdos", didr)
144 p2.Run(t)
145
146 b1 := MustSetupRelay(t, didr, archive)
147 b1.Run(t)
148
149 b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()}
150
151 p1.RequestScraping(t, b1)
152 p1.BumpLimits(t, b1)
153 time.Sleep(time.Millisecond * 100)
154
155 var users []*TestUser
156 for i := 0; i < 5; i++ {
157 users = append(users, p1.MustNewUser(t, usernames[i]+".pdsuno"))
158 }
159
160 randomFollows(t, users)
161 socialSim(t, users, 10, 10)
162
163 var users2 []*TestUser
164 for i := 0; i < 5; i++ {
165 users2 = append(users2, p2.MustNewUser(t, usernames[i+5]+".pdsdos"))
166 }
167
168 randomFollows(t, users2)
169 p2posts := socialSim(t, users2, 10, 10)
170
171 randomFollows(t, append(users, users2...))
172
173 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life")
174
175 // now if we make posts on pds 2, the relay will not hear about those new posts
176
177 p2posts2 := socialSim(t, users2, 10, 10)
178
179 time.Sleep(time.Second)
180
181 p2.RequestScraping(t, b1)
182 p2.BumpLimits(t, b1)
183 time.Sleep(time.Millisecond * 50)
184
185 // Now, the relay will discover a gap, and have to catch up somehow
186 socialSim(t, users2, 1, 0)
187
188 // we expect the relay to learn about posts that it did not directly see from
189 // repos its already partially scraped, as long as its seen *something* after the missing post
190 // this is the 'catchup' process
191 _ = p2posts2
192 /* NOTE: BGS doesn't support indexing any more
193 time.Sleep(time.Second)
194 ctx := context.Background()
195 _, err := b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri)
196 if err != nil {
197 t.Fatal(err)
198 }
199 */
200}
201
202func TestRelayMultiGap(t *testing.T) {
203 if testing.Short() {
204 t.Skip("skipping Relay test in 'short' test mode")
205 }
206 //t.Skip("test too sleepy to run in CI for now")
207 assert := assert.New(t)
208 _ = assert
209 didr := TestPLC(t)
210 p1 := MustSetupPDS(t, ".pdsuno", didr)
211 p1.Run(t)
212
213 p2 := MustSetupPDS(t, ".pdsdos", didr)
214 p2.Run(t)
215
216 b1 := MustSetupRelay(t, didr, true)
217 b1.Run(t)
218
219 b1.tr.TrialHosts = []string{p1.RawHost(), p2.RawHost()}
220
221 p1.RequestScraping(t, b1)
222 p1.BumpLimits(t, b1)
223 time.Sleep(time.Millisecond * 250)
224
225 users := []*TestUser{p1.MustNewUser(t, usernames[0]+".pdsuno")}
226
227 socialSim(t, users, 10, 0)
228
229 users2 := []*TestUser{p2.MustNewUser(t, usernames[1]+".pdsdos")}
230
231 p2posts := socialSim(t, users2, 10, 0)
232
233 users[0].Reply(t, p2posts[0], p2posts[0], "what a wonderful life")
234
235 /* NOTE: BGS doesn't support indexing any more
236 time.Sleep(time.Second * 2)
237 ctx := context.Background()
238 _, err := b1.bgs.Index.GetPost(ctx, p2posts[3].Uri)
239 if err != nil {
240 t.Fatal(err)
241 }
242 */
243
244 // now if we make posts on pds 2, the relay will not hear about those new posts
245
246 p2posts2 := socialSim(t, users2, 10, 0)
247
248 time.Sleep(time.Second)
249
250 p2.RequestScraping(t, b1)
251 p2.BumpLimits(t, b1)
252 time.Sleep(time.Second * 2)
253
254 // Now, the relay will discover a gap, and have to catch up somehow
255 socialSim(t, users2, 1, 0)
256
257 // we expect the relay to learn about posts that it did not directly see from
258 // repos its already partially scraped, as long as its seen *something* after the missing post
259 // this is the 'catchup' process
260 _ = p2posts2
261 /* NOTE: BGS doesn't support indexing any more
262 time.Sleep(time.Second * 2)
263 _, err = b1.bgs.Index.GetPost(ctx, p2posts2[4].Uri)
264 if err != nil {
265 t.Fatal(err)
266 }
267 */
268}
269
270func TestHandleChange(t *testing.T) {
271 //t.Skip("test too sleepy to run in CI for now")
272 assert := assert.New(t)
273 _ = assert
274 didr := TestPLC(t)
275 p1 := MustSetupPDS(t, ".pdsuno", didr)
276 p1.Run(t)
277
278 b1 := MustSetupRelay(t, didr, true)
279 b1.Run(t)
280
281 b1.tr.TrialHosts = []string{p1.RawHost()}
282
283 p1.RequestScraping(t, b1)
284 p1.BumpLimits(t, b1)
285 time.Sleep(time.Millisecond * 50)
286
287 evts := b1.Events(t, -1)
288
289 u := p1.MustNewUser(t, usernames[0]+".pdsuno")
290
291 // if the handle changes before the relay processes the first event, things
292 // get a little weird
293 time.Sleep(time.Millisecond * 50)
294 //socialSim(t, []*testUser{u}, 10, 0)
295
296 u.ChangeHandle(t, "catbear.pdsuno")
297
298 time.Sleep(time.Millisecond * 100)
299
300 initevt := evts.Next()
301 t.Log(initevt.RepoCommit)
302 hcevt := evts.Next()
303 t.Log(hcevt.RepoHandle)
304 idevt := evts.Next()
305 t.Log(idevt.RepoIdentity)
306}
307
308func TestAccountEvent(t *testing.T) {
309 assert := assert.New(t)
310 _ = assert
311 didr := TestPLC(t)
312 p1 := MustSetupPDS(t, ".pdsuno", didr)
313 p1.Run(t)
314
315 b1 := MustSetupRelay(t, didr, true)
316 b1.Run(t)
317
318 b1.tr.TrialHosts = []string{p1.RawHost()}
319
320 p1.RequestScraping(t, b1)
321 p1.BumpLimits(t, b1)
322 time.Sleep(time.Millisecond * 50)
323
324 evts := b1.Events(t, -1)
325
326 u := p1.MustNewUser(t, usernames[0]+".pdsuno")
327
328 // if the handle changes before the relay processes the first event, things
329 // get a little weird
330 time.Sleep(time.Millisecond * 50)
331 //socialSim(t, []*testUser{u}, 10, 0)
332
333 p1.TakedownRepo(t, u.DID())
334 p1.ReactivateRepo(t, u.DID())
335 p1.DeactivateRepo(t, u.DID())
336 p1.ReactivateRepo(t, u.DID())
337 p1.SuspendRepo(t, u.DID())
338 p1.ReactivateRepo(t, u.DID())
339
340 time.Sleep(time.Millisecond * 100)
341
342 initevt := evts.Next()
343 t.Log(initevt.RepoCommit)
344
345 // Takedown
346 acevt := evts.Next()
347 t.Log(acevt.RepoAccount)
348 assert.Equal(acevt.RepoAccount.Did, u.DID())
349 assert.Equal(acevt.RepoAccount.Active, false)
350 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown)
351
352 // Reactivate
353 acevt = evts.Next()
354 t.Log(acevt.RepoAccount)
355 assert.Equal(acevt.RepoAccount.Did, u.DID())
356 assert.Equal(acevt.RepoAccount.Active, true)
357 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive)
358
359 // Deactivate
360 acevt = evts.Next()
361 t.Log(acevt.RepoAccount)
362 assert.Equal(acevt.RepoAccount.Did, u.DID())
363 assert.Equal(acevt.RepoAccount.Active, false)
364 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusDeactivated)
365
366 // Reactivate
367 acevt = evts.Next()
368 t.Log(acevt.RepoAccount)
369 assert.Equal(acevt.RepoAccount.Did, u.DID())
370 assert.Equal(acevt.RepoAccount.Active, true)
371 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive)
372
373 // Suspend
374 acevt = evts.Next()
375 t.Log(acevt.RepoAccount)
376 assert.Equal(acevt.RepoAccount.Did, u.DID())
377 assert.Equal(acevt.RepoAccount.Active, false)
378 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusSuspended)
379
380 // Reactivate
381 acevt = evts.Next()
382 t.Log(acevt.RepoAccount)
383 assert.Equal(acevt.RepoAccount.Did, u.DID())
384 assert.Equal(acevt.RepoAccount.Active, true)
385 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive)
386
387 // Takedown at Relay level, then emit active event and make sure relay overrides it
388 b1.bgs.TakeDownRepo(context.TODO(), u.DID())
389 p1.ReactivateRepo(t, u.DID())
390
391 time.Sleep(time.Millisecond * 20)
392
393 acevt = evts.Next()
394 t.Log(acevt.RepoAccount)
395 assert.Equal(acevt.RepoAccount.Did, u.DID())
396 assert.Equal(acevt.RepoAccount.Active, false)
397 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusTakendown)
398
399 // Reactivate at Relay level, then emit an active account event and make sure relay passes it through
400 b1.bgs.ReverseTakedown(context.TODO(), u.DID())
401 p1.ReactivateRepo(t, u.DID())
402
403 time.Sleep(time.Millisecond * 20)
404
405 acevt = evts.Next()
406 t.Log(acevt.RepoAccount)
407 assert.Equal(acevt.RepoAccount.Did, u.DID())
408 assert.Equal(acevt.RepoAccount.Active, true)
409 assert.Equal(*acevt.RepoAccount.Status, events.AccountStatusActive)
410}
411
412func TestRelayTakedown(t *testing.T) {
413 testRelayTakedown(t, true)
414}
415
416func TestRelayTakedownNonArchive(t *testing.T) {
417 testRelayTakedown(t, false)
418}
419
420func testRelayTakedown(t *testing.T, archive bool) {
421 if testing.Short() {
422 t.Skip("skipping Relay test in 'short' test mode")
423 }
424 assert := assert.New(t)
425 _ = assert
426
427 didr := TestPLC(t)
428 p1 := MustSetupPDS(t, ".tpds", didr)
429 p1.Run(t)
430
431 b1 := MustSetupRelay(t, didr, true)
432 b1.Run(t)
433
434 b1.tr.TrialHosts = []string{p1.RawHost()}
435
436 p1.RequestScraping(t, b1)
437 p1.BumpLimits(t, b1)
438
439 time.Sleep(time.Millisecond * 50)
440 es1 := b1.Events(t, 0)
441
442 bob := p1.MustNewUser(t, "bob.tpds")
443 alice := p1.MustNewUser(t, "alice.tpds")
444
445 bob.Post(t, "cats for cats")
446 alice.Post(t, "no i like dogs")
447 bp2 := bob.Post(t, "im a bad person who deserves to be taken down")
448 bob.Like(t, bp2)
449
450 expCount := 6
451 evts1 := es1.WaitFor(expCount)
452 assert.Equal(expCount, len(evts1))
453
454 assert.NoError(b1.bgs.TakeDownRepo(context.TODO(), bob.did))
455
456 es2 := b1.Events(t, 0)
457 time.Sleep(time.Millisecond * 50) // wait for events to stream in and be collected
458 evts2 := es2.WaitFor(2)
459
460 assert.Equal(2, len(evts2))
461 for _, e := range evts2 {
462 if e.RepoCommit.Repo == bob.did {
463 t.Fatal("events from bob were not removed")
464 }
465 }
466
467 bob.Post(t, "im gonna sneak through being banned")
468 time.Sleep(time.Millisecond * 50)
469 alice.Post(t, "im a normal person")
470 // ensure events from bob dont get through
471
472 last := es2.Next()
473 assert.Equal(alice.did, last.RepoCommit.Repo)
474}
475
476func commitFromSlice(t *testing.T, slice []byte, rcid cid.Cid) *repo.SignedCommit {
477 carr, err := car.NewCarReader(bytes.NewReader(slice))
478 if err != nil {
479 t.Fatal(err)
480 }
481
482 for {
483 blk, err := carr.Next()
484 if err != nil {
485 t.Fatal(err)
486 }
487
488 if blk.Cid() == rcid {
489
490 var sc repo.SignedCommit
491 if err := sc.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil {
492 t.Fatal(err)
493 }
494 return &sc
495 }
496 }
497}
498
499func TestDomainBans(t *testing.T) {
500 if testing.Short() {
501 t.Skip("skipping Relay test in 'short' test mode")
502 }
503 didr := TestPLC(t)
504
505 b1 := MustSetupRelay(t, didr, true)
506 b1.Run(t)
507
508 b1.BanDomain(t, "foo.com")
509
510 c := &xrpc.Client{Host: "http://" + b1.Host()}
511 if err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "foo.com"}); err == nil {
512 t.Fatal("domain should be banned")
513 }
514
515 if err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "pds.foo.com"}); err == nil {
516 t.Fatal("domain should be banned")
517 }
518
519 err := atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "app.pds.foo.com"})
520 if err == nil {
521 t.Fatal("domain should be banned")
522 }
523
524 if !strings.Contains(err.Error(), "XRPC ERROR 401") {
525 t.Fatal("should have failed with a 401")
526 }
527
528 // should not be banned
529 err = atproto.SyncRequestCrawl(context.TODO(), c, &atproto.SyncRequestCrawl_Input{Hostname: "foo.bar.com"})
530 if err == nil {
531 t.Fatal("should still fail")
532 }
533
534 if !strings.Contains(err.Error(), "XRPC ERROR 400") {
535 t.Fatal("should have failed with a 400")
536 }
537}
538
539func TestRelayHandleEmptyEvent(t *testing.T) {
540 if testing.Short() {
541 t.Skip("skipping Relay test in 'short' test mode")
542 }
543 assert := assert.New(t)
544 didr := TestPLC(t)
545 p1 := MustSetupPDS(t, ".tpds", didr)
546 p1.Run(t)
547
548 b1 := MustSetupRelay(t, didr, true)
549 b1.Run(t)
550
551 b1.tr.TrialHosts = []string{p1.RawHost()}
552
553 p1.RequestScraping(t, b1)
554 p1.BumpLimits(t, b1)
555
556 time.Sleep(time.Millisecond * 50)
557
558 evts := b1.Events(t, -1)
559 defer evts.Cancel()
560
561 bob := p1.MustNewUser(t, "bob.tpds")
562 t.Log("event 1")
563 e1 := evts.Next()
564 assert.NotNil(e1.RepoCommit)
565 assert.Equal(e1.RepoCommit.Repo, bob.DID())
566 t.Log(e1.RepoCommit.Ops[0])
567
568 ctx := context.TODO()
569 rm := p1.server.Repoman()
570 if err := rm.BatchWrite(ctx, 1, nil); err != nil {
571 t.Fatal(err)
572 }
573
574 e2 := evts.Next()
575 //t.Log(e2.RepoCommit.Ops[0])
576 assert.Equal(len(e2.RepoCommit.Ops), 0)
577 assert.Equal(e2.RepoCommit.Repo, bob.DID())
578}