Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: listen for account deletion/takedown/identity events on firehose #17

open opened by pdewey.com targeting main from push-qtmrwnrptzrz
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:hm5f3dnm6jdhrc55qp2npdja/sh.tangled.repo.pull/3ml7z32myem22
+114 -3
Diff #0
+11
internal/firehose/profile_watcher.go
··· 280 280 } 281 281 } 282 282 283 + // ProcessEvent processes a single Jetstream event through the profile-watcher 284 + // pipeline. Exported for use in integration tests where events are fed from a 285 + // test PDS firehose rather than a live Jetstream connection. Production code 286 + // reaches this path via processMessage. 287 + func (pw *ProfileWatcher) ProcessEvent(event JetstreamEvent) { 288 + pw.dispatch(event) 289 + } 290 + 283 291 func (pw *ProfileWatcher) processMessage(data []byte) { 284 292 var event JetstreamEvent 285 293 if err := json.Unmarshal(data, &event); err != nil { 286 294 return 287 295 } 296 + pw.dispatch(event) 297 + } 288 298 299 + func (pw *ProfileWatcher) dispatch(event JetstreamEvent) { 289 300 switch event.Kind { 290 301 case "commit": 291 302 if event.Commit == nil || event.Commit.Collection != NSIDBlueskyProfile {
+58
tests/integration/firehose_test.go
··· 455 455 } 456 456 } 457 457 458 + // TestFirehose_AccountDeactivated_KeepsData verifies an end-to-end account 459 + // status change: deactivating an account on the test PDS emits a real #account 460 + // firehose event (status=deactivated), which our bridge forwards to the 461 + // ProfileWatcher. Because deactivation is reversible, indexed records must NOT 462 + // be purged. 463 + func TestFirehose_AccountDeactivated_KeepsData(t *testing.T) { 464 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 465 + ctx := context.Background() 466 + 467 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "Survives Deactivation")), "roaster") 468 + uri := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, rkey) 469 + h.WaitForRecord(uri, firehoseWait) 470 + 471 + apiClient := h.accounts[h.PrimaryAccount.DID] 472 + require.NotNil(t, apiClient) 473 + 474 + err := apiClient.Post(ctx, "com.atproto.server.deactivateAccount", map[string]any{}, nil) 475 + require.NoError(t, err) 476 + 477 + // Give the firehose bridge time to dispatch the event. 478 + time.Sleep(500 * time.Millisecond) 479 + 480 + rec, err := h.FeedIndex.GetRecord(ctx, uri) 481 + assert.NoError(t, err) 482 + assert.NotNil(t, rec, "deactivated accounts are reversible โ€” record should remain indexed") 483 + } 484 + 485 + // TestFirehose_AccountDeleted_PurgesData verifies the deletion path through 486 + // the bridge โ†’ ProfileWatcher โ†’ DeleteAllByDID pipeline. testpds gates real 487 + // account deletion behind a server-side token (not exposed to clients), so we 488 + // synthesize the account event directly into the watcher to exercise the 489 + // integration wiring without bypassing it. 490 + func TestFirehose_AccountDeleted_PurgesData(t *testing.T) { 491 + h := StartHarness(t, &HarnessOptions{EnableFirehose: true}) 492 + ctx := context.Background() 493 + 494 + rkey := mustRKey(t, h.PostForm("/api/roasters", form("name", "About to be Purged")), "roaster") 495 + uri := atproto.BuildATURI(h.PrimaryAccount.DID, atproto.NSIDRoaster, rkey) 496 + h.WaitForRecord(uri, firehoseWait) 497 + 498 + require.NotNil(t, h.ProfileWatcher) 499 + h.ProfileWatcher.ProcessEvent(firehose.JetstreamEvent{ 500 + DID: h.PrimaryAccount.DID, 501 + TimeUS: time.Now().UnixMicro(), 502 + Kind: "account", 503 + Account: &firehose.JetstreamAccount{ 504 + Active: false, 505 + DID: h.PrimaryAccount.DID, 506 + Status: "deleted", 507 + Time: time.Now().Format(time.RFC3339), 508 + }, 509 + }) 510 + 511 + rec, err := h.FeedIndex.GetRecord(ctx, uri) 512 + assert.NoError(t, err) 513 + assert.Nil(t, rec, "deleted account's records should be purged from index") 514 + } 515 + 458 516 // --- helpers --- 459 517 460 518 // waitFor polls condition until it returns true or the timeout expires.
+45 -3
tests/integration/harness.go
··· 88 88 PDS *testpds.TestPDS 89 89 Server *httptest.Server 90 90 Handler *handlers.Handler 91 - FeedIndex *firehose.FeedIndex 92 - Consumer *firehose.Consumer 93 - SessionCache *atproto.SessionCache 91 + FeedIndex *firehose.FeedIndex 92 + Consumer *firehose.Consumer 93 + ProfileWatcher *firehose.ProfileWatcher 94 + SessionCache *atproto.SessionCache 94 95 95 96 // PrimaryAccount is the default account created on harness setup. 96 97 PrimaryAccount TestAccount ··· 237 238 WantedCollections: firehose.ArabicaCollections, 238 239 }, feedIndex) 239 240 harness.Consumer = consumer 241 + harness.ProfileWatcher = firehose.NewProfileWatcher(&firehose.Config{}, feedIndex) 240 242 241 243 ctx, cancel := context.WithCancel(context.Background()) 242 244 harness.firehoseCancel = cancel ··· 515 517 if !ok { 516 518 return 517 519 } 520 + 521 + if evt.Identity != nil && h.ProfileWatcher != nil { 522 + handle := "" 523 + if evt.Identity.Handle != nil { 524 + handle = *evt.Identity.Handle 525 + } 526 + h.ProfileWatcher.ProcessEvent(firehose.JetstreamEvent{ 527 + DID: evt.Identity.Did, 528 + TimeUS: time.Now().UnixMicro(), 529 + Kind: "identity", 530 + Identity: &firehose.JetstreamIdentity{ 531 + DID: evt.Identity.Did, 532 + Handle: handle, 533 + Seq: evt.Identity.Seq, 534 + Time: evt.Identity.Time, 535 + }, 536 + }) 537 + continue 538 + } 539 + 540 + if evt.Account != nil && h.ProfileWatcher != nil { 541 + status := "" 542 + if evt.Account.Status != nil { 543 + status = *evt.Account.Status 544 + } 545 + h.ProfileWatcher.ProcessEvent(firehose.JetstreamEvent{ 546 + DID: evt.Account.Did, 547 + TimeUS: time.Now().UnixMicro(), 548 + Kind: "account", 549 + Account: &firehose.JetstreamAccount{ 550 + Active: evt.Account.Active, 551 + DID: evt.Account.Did, 552 + Seq: evt.Account.Seq, 553 + Status: status, 554 + Time: evt.Account.Time, 555 + }, 556 + }) 557 + continue 558 + } 559 + 518 560 if evt.Commit == nil { 519 561 continue 520 562 }

History

1 round 0 comments
sign up or login to add to the discussion
pdewey.com submitted #0
1 commit
expand
feat: listen for account deletion/takedown/identity events on firehose
merge conflicts detected
expand
  • internal/firehose/consumer.go:27
  • internal/firehose/index.go:459
  • internal/firehose/index_test.go:609
  • internal/firehose/profile_watcher.go:68
expand 0 comments