+114
-3
Diff
round #0
+11
internal/firehose/profile_watcher.go
+11
internal/firehose/profile_watcher.go
···
280
280
}
281
281
}
282
282
283
+
// ProcessEvent processes a single Jetstream event through the profile-watcher
284
+
// pipeline. Exported for use in integration tests where events are fed from a
285
+
// test PDS firehose rather than a live Jetstream connection. Production code
286
+
// reaches this path via processMessage.
287
+
func (pw *ProfileWatcher) ProcessEvent(event JetstreamEvent) {
288
+
pw.dispatch(event)
289
+
}
290
+
283
291
func (pw *ProfileWatcher) processMessage(data []byte) {
284
292
var event JetstreamEvent
285
293
if err := json.Unmarshal(data, &event); err != nil {
286
294
return
287
295
}
296
+
pw.dispatch(event)
297
+
}
288
298
299
+
func (pw *ProfileWatcher) dispatch(event JetstreamEvent) {
289
300
switch event.Kind {
290
301
case "commit":
291
302
if event.Commit == nil || event.Commit.Collection != NSIDBlueskyProfile {
+58
tests/integration/firehose_test.go
+58
tests/integration/firehose_test.go
···
455
455
}
456
456
}
457
457
458
+
// TestFirehose_AccountDeactivated_KeepsData verifies an end-to-end account
459
+
// status change: deactivating an account on the test PDS emits a real #account
460
+
// firehose event (status=deactivated), which our bridge forwards to the
461
+
// ProfileWatcher. Because deactivation is reversible, indexed records must NOT
462
+
// be purged.
463
+
func TestFirehose_AccountDeactivated_KeepsData(t *testing.T) {
464
+
h := StartHarness(t, &HarnessOptions{EnableFirehose: true})
465
+
ctx := context.Background()
466
+
467
+
rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Survives Deactivation")), "roaster")
468
+
uri := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, rkey)
469
+
h.WaitForRecord(uri, firehoseWait)
470
+
471
+
apiClient := h.accounts[h.PrimaryAccount.DID]
472
+
require.NotNil(t, apiClient)
473
+
474
+
err := apiClient.Post(ctx, "com.atproto.server.deactivateAccount", map[string]any{}, nil)
475
+
require.NoError(t, err)
476
+
477
+
// Give the firehose bridge time to dispatch the event.
478
+
time.Sleep(500 * time.Millisecond)
479
+
480
+
rec, err := h.FeedIndex.GetRecord(ctx, uri)
481
+
assert.NoError(t, err)
482
+
assert.NotNil(t, rec, "deactivated accounts are reversible โ record should remain indexed")
483
+
}
484
+
485
+
// TestFirehose_AccountDeleted_PurgesData verifies the deletion path through
486
+
// the bridge โ ProfileWatcher โ DeleteAllByDID pipeline. testpds gates real
487
+
// account deletion behind a server-side token (not exposed to clients), so we
488
+
// synthesize the account event directly into the watcher to exercise the
489
+
// integration wiring without bypassing it.
490
+
func TestFirehose_AccountDeleted_PurgesData(t *testing.T) {
491
+
h := StartHarness(t, &HarnessOptions{EnableFirehose: true})
492
+
ctx := context.Background()
493
+
494
+
rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "About to be Purged")), "roaster")
495
+
uri := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, rkey)
496
+
h.WaitForRecord(uri, firehoseWait)
497
+
498
+
require.NotNil(t, h.ProfileWatcher)
499
+
h.ProfileWatcher.ProcessEvent(firehose.JetstreamEvent{
500
+
DID: h.PrimaryAccount.DID,
501
+
TimeUS: time.Now().UnixMicro(),
502
+
Kind: "account",
503
+
Account: &firehose.JetstreamAccount{
504
+
Active: false,
505
+
DID: h.PrimaryAccount.DID,
506
+
Status: "deleted",
507
+
Time: time.Now().Format(time.RFC3339),
508
+
},
509
+
})
510
+
511
+
rec, err := h.FeedIndex.GetRecord(ctx, uri)
512
+
assert.NoError(t, err)
513
+
assert.Nil(t, rec, "deleted account's records should be purged from index")
514
+
}
515
+
458
516
// --- helpers ---
459
517
460
518
// waitFor polls condition until it returns true or the timeout expires.
+45
-3
tests/integration/harness.go
+45
-3
tests/integration/harness.go
···
88
88
PDS *testpds.TestPDS
89
89
Server *httptest.Server
90
90
Handler *handlers.Handler
91
-
FeedIndex *firehose.FeedIndex
92
-
Consumer *firehose.Consumer
93
-
SessionCache *atproto.SessionCache
91
+
FeedIndex *firehose.FeedIndex
92
+
Consumer *firehose.Consumer
93
+
ProfileWatcher *firehose.ProfileWatcher
94
+
SessionCache *atproto.SessionCache
94
95
95
96
// PrimaryAccount is the default account created on harness setup.
96
97
PrimaryAccount TestAccount
···
237
238
WantedCollections: firehose.ArabicaCollections,
238
239
}, feedIndex)
239
240
harness.Consumer = consumer
241
+
harness.ProfileWatcher = firehose.NewProfileWatcher(&firehose.Config{}, feedIndex)
240
242
241
243
ctx, cancel := context.WithCancel(context.Background())
242
244
harness.firehoseCancel = cancel
···
515
517
if !ok {
516
518
return
517
519
}
520
+
521
+
if evt.Identity != nil && h.ProfileWatcher != nil {
522
+
handle := ""
523
+
if evt.Identity.Handle != nil {
524
+
handle = *evt.Identity.Handle
525
+
}
526
+
h.ProfileWatcher.ProcessEvent(firehose.JetstreamEvent{
527
+
DID: evt.Identity.Did,
528
+
TimeUS: time.Now().UnixMicro(),
529
+
Kind: "identity",
530
+
Identity: &firehose.JetstreamIdentity{
531
+
DID: evt.Identity.Did,
532
+
Handle: handle,
533
+
Seq: evt.Identity.Seq,
534
+
Time: evt.Identity.Time,
535
+
},
536
+
})
537
+
continue
538
+
}
539
+
540
+
if evt.Account != nil && h.ProfileWatcher != nil {
541
+
status := ""
542
+
if evt.Account.Status != nil {
543
+
status = *evt.Account.Status
544
+
}
545
+
h.ProfileWatcher.ProcessEvent(firehose.JetstreamEvent{
546
+
DID: evt.Account.Did,
547
+
TimeUS: time.Now().UnixMicro(),
548
+
Kind: "account",
549
+
Account: &firehose.JetstreamAccount{
550
+
Active: evt.Account.Active,
551
+
DID: evt.Account.Did,
552
+
Seq: evt.Account.Seq,
553
+
Status: status,
554
+
Time: evt.Account.Time,
555
+
},
556
+
})
557
+
continue
558
+
}
559
+
518
560
if evt.Commit == nil {
519
561
continue
520
562
}
History
1 round
0 comments
pdewey.com
submitted
#0
1 commit
expand
collapse
feat: listen for account deletion/takedown/identity events on firehose
merge conflicts detected
expand
collapse
expand
collapse
- internal/firehose/consumer.go:27
- internal/firehose/index.go:459
- internal/firehose/index_test.go:609
- internal/firehose/profile_watcher.go:68