+328
-10
Diff
round #0
+25
-4
internal/firehose/consumer.go
+25
-4
internal/firehose/consumer.go
···
27
27
CID string `json:"cid"`
28
28
}
29
29
30
+
// JetstreamIdentity represents the identity payload of a Jetstream event.
31
+
// Emitted when a DID's handle or PDS endpoint changes.
32
+
type JetstreamIdentity struct {
33
+
DID string `json:"did"`
34
+
Handle string `json:"handle,omitempty"`
35
+
Seq int64 `json:"seq"`
36
+
Time string `json:"time"`
37
+
}
38
+
39
+
// JetstreamAccount represents the account payload of a Jetstream event.
40
+
// Status is one of "active", "deleted", "deactivated", "suspended", "takendown".
41
+
type JetstreamAccount struct {
42
+
Active bool `json:"active"`
43
+
DID string `json:"did"`
44
+
Seq int64 `json:"seq"`
45
+
Status string `json:"status,omitempty"`
46
+
Time string `json:"time"`
47
+
}
48
+
30
49
// JetstreamEvent represents an event from Jetstream
31
50
type JetstreamEvent struct {
32
-
DID string `json:"did"`
33
-
TimeUS int64 `json:"time_us"`
34
-
Kind string `json:"kind"` // "commit", "identity", "account"
35
-
Commit *JetstreamCommit `json:"commit,omitempty"`
51
+
DID string `json:"did"`
52
+
TimeUS int64 `json:"time_us"`
53
+
Kind string `json:"kind"` // "commit", "identity", "account"
54
+
Commit *JetstreamCommit `json:"commit,omitempty"`
55
+
Identity *JetstreamIdentity `json:"identity,omitempty"`
56
+
Account *JetstreamAccount `json:"account,omitempty"`
36
57
}
37
58
38
59
// Consumer consumes events from Jetstream and indexes them
+54
internal/firehose/index.go
+54
internal/firehose/index.go
···
459
459
return err
460
460
}
461
461
462
+
// DeleteAllByDID removes all data associated with a DID from the index.
463
+
// Used when a Jetstream account event reports the DID as deleted or takendown.
464
+
//
465
+
// Removes: records authored by the DID; likes/comments by the DID; likes/comments
466
+
// targeting the DID's records; profile cache; notifications to or from the DID;
467
+
// known/registered/backfilled tracking; user settings.
468
+
//
469
+
// Preserves moderation_* tables (reports, audit log, blacklist, labels, hidden
470
+
// records, autohide resets) โ those are evidence of moderation actions and
471
+
// should outlive the account.
472
+
func (idx *FeedIndex) DeleteAllByDID(ctx context.Context, did string) error {
473
+
uriPrefix := fmt.Sprintf("at://%s/%%", did)
474
+
475
+
tx, err := idx.db.BeginTx(ctx, nil)
476
+
if err != nil {
477
+
return fmt.Errorf("begin tx: %w", err)
478
+
}
479
+
defer tx.Rollback() //nolint:errcheck
480
+
481
+
stmts := []struct {
482
+
sql string
483
+
args []any
484
+
}{
485
+
{`DELETE FROM records WHERE did = ?`, []any{did}},
486
+
{`DELETE FROM likes WHERE actor_did = ?`, []any{did}},
487
+
{`DELETE FROM likes WHERE subject_uri LIKE ?`, []any{uriPrefix}},
488
+
{`DELETE FROM comments WHERE actor_did = ?`, []any{did}},
489
+
{`DELETE FROM comments WHERE subject_uri LIKE ?`, []any{uriPrefix}},
490
+
{`DELETE FROM notifications WHERE target_did = ? OR actor_did = ?`, []any{did, did}},
491
+
{`DELETE FROM notifications_meta WHERE target_did = ?`, []any{did}},
492
+
{`DELETE FROM profiles WHERE did = ?`, []any{did}},
493
+
{`DELETE FROM known_dids WHERE did = ?`, []any{did}},
494
+
{`DELETE FROM registered_dids WHERE did = ?`, []any{did}},
495
+
{`DELETE FROM backfilled WHERE did = ?`, []any{did}},
496
+
{`DELETE FROM user_settings WHERE did = ?`, []any{did}},
497
+
}
498
+
499
+
for _, s := range stmts {
500
+
if _, err := tx.ExecContext(ctx, s.sql, s.args...); err != nil {
501
+
return fmt.Errorf("delete by did: %w", err)
502
+
}
503
+
}
504
+
505
+
if err := tx.Commit(); err != nil {
506
+
return fmt.Errorf("commit: %w", err)
507
+
}
508
+
509
+
idx.profileCacheMu.Lock()
510
+
delete(idx.profileCache, did)
511
+
idx.profileCacheMu.Unlock()
512
+
513
+
return nil
514
+
}
515
+
462
516
// UpsertWitnessRecord implements atproto.WitnessCache for write-through caching.
463
517
func (idx *FeedIndex) UpsertWitnessRecord(ctx context.Context, did, collection, rkey, cid string, record json.RawMessage) error {
464
518
return idx.UpsertRecord(ctx, did, collection, rkey, cid, record, time.Now().UnixMicro())
+86
internal/firehose/index_test.go
+86
internal/firehose/index_test.go
···
609
609
610
610
assert.Equal(t, 0, idx.RecordCount(), "all records should be deleted")
611
611
}
612
+
613
+
func TestDeleteAllByDID(t *testing.T) {
614
+
tmpDir := t.TempDir()
615
+
idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour)
616
+
assert.NoError(t, err)
617
+
defer idx.Close()
618
+
619
+
ctx := context.Background()
620
+
target := "did:plc:victim"
621
+
other := "did:plc:bystander"
622
+
now := time.Now().Unix()
623
+
624
+
// Records: target owns 2, other owns 1
625
+
bean := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean","createdAt":"2025-01-01T00:00:00Z"}`)
626
+
brew := []byte(`{"$type":"social.arabica.alpha.brew","createdAt":"2025-01-02T00:00:00Z"}`)
627
+
assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.bean", "b1", "cid1", bean, now))
628
+
assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.brew", "br1", "cid2", brew, now))
629
+
assert.NoError(t, idx.UpsertRecord(ctx, other, "social.arabica.alpha.bean", "b2", "cid3", bean, now))
630
+
631
+
// Likes: target -> other's record, other -> target's record, other -> other's record
632
+
targetBeanURI := "at://" + target + "/social.arabica.alpha.bean/b1"
633
+
otherBeanURI := "at://" + other + "/social.arabica.alpha.bean/b2"
634
+
assert.NoError(t, idx.UpsertLike(ctx, target, "lk1", otherBeanURI))
635
+
assert.NoError(t, idx.UpsertLike(ctx, other, "lk2", targetBeanURI))
636
+
assert.NoError(t, idx.UpsertLike(ctx, other, "lk3", otherBeanURI))
637
+
638
+
// Comments mirroring the like pattern
639
+
createdAt := time.Now()
640
+
assert.NoError(t, idx.UpsertComment(ctx, target, "c1", otherBeanURI, "", "cidc1", "by target", createdAt))
641
+
assert.NoError(t, idx.UpsertComment(ctx, other, "c2", targetBeanURI, "", "cidc2", "on target", createdAt))
642
+
assert.NoError(t, idx.UpsertComment(ctx, other, "c3", otherBeanURI, "", "cidc3", "untouched", createdAt))
643
+
644
+
// Backfill marker for target
645
+
assert.NoError(t, idx.MarkBackfilled(ctx, target))
646
+
assert.True(t, idx.IsBackfilled(ctx, target))
647
+
648
+
// Settings for target
649
+
assert.NoError(t, idx.SetProfileStatsVisibility(ctx, target, models.ProfileStatsVisibility{}))
650
+
651
+
// Notification: target receives a like from other
652
+
idx.CreateLikeNotification(other, targetBeanURI)
653
+
654
+
// Sanity: counts before
655
+
assert.Equal(t, 3, idx.RecordCount())
656
+
assert.Equal(t, 3, idx.TotalLikeCount())
657
+
assert.Equal(t, 3, idx.TotalCommentCount())
658
+
659
+
// Act
660
+
assert.NoError(t, idx.DeleteAllByDID(ctx, target))
661
+
662
+
// Records: only other's bean remains
663
+
assert.Equal(t, 1, idx.RecordCount())
664
+
rec, err := idx.GetRecord(ctx, otherBeanURI)
665
+
assert.NoError(t, err)
666
+
assert.NotNil(t, rec)
667
+
668
+
// Likes: only other->other survives (target's like and the like ON target's record gone)
669
+
assert.Equal(t, 1, idx.TotalLikeCount())
670
+
assert.Equal(t, 0, idx.GetLikeCount(ctx, targetBeanURI))
671
+
assert.Equal(t, 1, idx.GetLikeCount(ctx, otherBeanURI))
672
+
assert.False(t, idx.HasUserLiked(ctx, target, otherBeanURI))
673
+
674
+
// Comments: only the third comment survives
675
+
assert.Equal(t, 1, idx.TotalCommentCount())
676
+
assert.Equal(t, 0, idx.GetCommentCount(ctx, targetBeanURI))
677
+
assert.Equal(t, 1, idx.GetCommentCount(ctx, otherBeanURI))
678
+
679
+
// Backfill cleared
680
+
assert.False(t, idx.IsBackfilled(ctx, target))
681
+
682
+
// Profile cache cleared (in-memory)
683
+
idx.profileCacheMu.RLock()
684
+
_, present := idx.profileCache[target]
685
+
idx.profileCacheMu.RUnlock()
686
+
assert.False(t, present)
687
+
}
688
+
689
+
func TestDeleteAllByDID_NoData(t *testing.T) {
690
+
tmpDir := t.TempDir()
691
+
idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour)
692
+
assert.NoError(t, err)
693
+
defer idx.Close()
694
+
695
+
// Should be a no-op for an unknown DID
696
+
assert.NoError(t, idx.DeleteAllByDID(context.Background(), "did:plc:ghost"))
697
+
}
+56
-6
internal/firehose/profile_watcher.go
+56
-6
internal/firehose/profile_watcher.go
···
68
68
}
69
69
}
70
70
71
+
// Unwatch removes a DID from the subscription. Used when an account is deleted
72
+
// so Jetstream stops sending events for that DID.
73
+
func (pw *ProfileWatcher) Unwatch(did string) {
74
+
pw.watchedDIDsMu.Lock()
75
+
_, present := pw.watchedDIDs[did]
76
+
delete(pw.watchedDIDs, did)
77
+
pw.watchedDIDsMu.Unlock()
78
+
79
+
if present {
80
+
pw.sendOptionsUpdate()
81
+
}
82
+
}
83
+
71
84
// Start begins the profile watcher in a background goroutine. It will reconnect
72
85
// automatically on failure, rotating through endpoints with exponential backoff.
73
86
func (pw *ProfileWatcher) Start(ctx context.Context) {
···
269
282
270
283
func (pw *ProfileWatcher) processMessage(data []byte) {
271
284
var event JetstreamEvent
272
-
if err := json.Unmarshal(data, &event); err != nil || event.Kind != "commit" || event.Commit == nil {
273
-
return
274
-
}
275
-
if event.Commit.Collection != NSIDBlueskyProfile {
285
+
if err := json.Unmarshal(data, &event); err != nil {
276
286
return
277
287
}
278
-
if event.Commit.Operation == "create" || event.Commit.Operation == "update" {
288
+
289
+
switch event.Kind {
290
+
case "commit":
291
+
if event.Commit == nil || event.Commit.Collection != NSIDBlueskyProfile {
292
+
return
293
+
}
294
+
if event.Commit.Operation == "create" || event.Commit.Operation == "update" {
295
+
pw.index.RefreshProfile(context.Background(), event.DID)
296
+
log.Debug().Str("did", event.DID).Msg("profile watcher: refreshed profile cache")
297
+
}
298
+
299
+
case "identity":
300
+
// Handle change or PDS migration โ refresh the cached profile so handle
301
+
// resolution stays accurate. Profile-commit events don't fire on handle
302
+
// changes, so this is the only signal we get.
279
303
pw.index.RefreshProfile(context.Background(), event.DID)
280
-
log.Debug().Str("did", event.DID).Msg("profile watcher: refreshed profile cache")
304
+
handle := ""
305
+
if event.Identity != nil {
306
+
handle = event.Identity.Handle
307
+
}
308
+
log.Info().Str("did", event.DID).Str("handle", handle).Msg("profile watcher: identity update, refreshed profile")
309
+
310
+
case "account":
311
+
if event.Account == nil {
312
+
return
313
+
}
314
+
status := event.Account.Status
315
+
log.Info().
316
+
Str("did", event.DID).
317
+
Str("status", status).
318
+
Bool("active", event.Account.Active).
319
+
Msg("profile watcher: account event")
320
+
321
+
// Only act on terminal states. deactivated/suspended are reversible โ
322
+
// we keep the data so it reappears if the account comes back.
323
+
if status == "deleted" || status == "takendown" {
324
+
if err := pw.index.DeleteAllByDID(context.Background(), event.DID); err != nil {
325
+
log.Error().Err(err).Str("did", event.DID).Str("status", status).Msg("profile watcher: failed to delete user data")
326
+
return
327
+
}
328
+
log.Warn().Str("did", event.DID).Str("status", status).Msg("profile watcher: purged all data for account")
329
+
pw.Unwatch(event.DID)
330
+
}
281
331
}
282
332
}
+107
internal/firehose/profile_watcher_test.go
+107
internal/firehose/profile_watcher_test.go
···
1
+
package firehose
2
+
3
+
import (
4
+
"context"
5
+
"testing"
6
+
"time"
7
+
8
+
"github.com/stretchr/testify/assert"
9
+
)
10
+
11
+
func TestProfileWatcher_AccountDeleted_PurgesData(t *testing.T) {
12
+
tmpDir := t.TempDir()
13
+
idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour)
14
+
assert.NoError(t, err)
15
+
defer idx.Close()
16
+
17
+
ctx := context.Background()
18
+
target := "did:plc:victim"
19
+
20
+
bean := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean","createdAt":"2025-01-01T00:00:00Z"}`)
21
+
assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.bean", "b1", "cid1", bean, time.Now().Unix()))
22
+
assert.Equal(t, 1, idx.RecordCount())
23
+
24
+
pw := &ProfileWatcher{
25
+
index: idx,
26
+
watchedDIDs: map[string]struct{}{target: {}},
27
+
stopCh: make(chan struct{}),
28
+
}
29
+
30
+
event := []byte(`{
31
+
"did":"did:plc:victim",
32
+
"time_us":1700000000000000,
33
+
"kind":"account",
34
+
"account":{"active":false,"did":"did:plc:victim","seq":1,"status":"deleted","time":"2025-01-03T00:00:00Z"}
35
+
}`)
36
+
pw.processMessage(event)
37
+
38
+
assert.Equal(t, 0, idx.RecordCount(), "records should be purged on account deletion")
39
+
40
+
pw.watchedDIDsMu.RLock()
41
+
_, present := pw.watchedDIDs[target]
42
+
pw.watchedDIDsMu.RUnlock()
43
+
assert.False(t, present, "DID should be unwatched after deletion")
44
+
}
45
+
46
+
func TestProfileWatcher_AccountDeactivated_KeepsData(t *testing.T) {
47
+
tmpDir := t.TempDir()
48
+
idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour)
49
+
assert.NoError(t, err)
50
+
defer idx.Close()
51
+
52
+
ctx := context.Background()
53
+
target := "did:plc:victim"
54
+
55
+
bean := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean","createdAt":"2025-01-01T00:00:00Z"}`)
56
+
assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.bean", "b1", "cid1", bean, time.Now().Unix()))
57
+
58
+
pw := &ProfileWatcher{
59
+
index: idx,
60
+
watchedDIDs: map[string]struct{}{target: {}},
61
+
stopCh: make(chan struct{}),
62
+
}
63
+
64
+
event := []byte(`{
65
+
"did":"did:plc:victim",
66
+
"time_us":1700000000000000,
67
+
"kind":"account",
68
+
"account":{"active":false,"did":"did:plc:victim","seq":1,"status":"deactivated","time":"2025-01-03T00:00:00Z"}
69
+
}`)
70
+
pw.processMessage(event)
71
+
72
+
assert.Equal(t, 1, idx.RecordCount(), "deactivated accounts are reversible โ keep data")
73
+
74
+
pw.watchedDIDsMu.RLock()
75
+
_, present := pw.watchedDIDs[target]
76
+
pw.watchedDIDsMu.RUnlock()
77
+
assert.True(t, present, "DID should remain watched on reversible status changes")
78
+
}
79
+
80
+
func TestProfileWatcher_AccountTakendown_PurgesData(t *testing.T) {
81
+
tmpDir := t.TempDir()
82
+
idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour)
83
+
assert.NoError(t, err)
84
+
defer idx.Close()
85
+
86
+
ctx := context.Background()
87
+
target := "did:plc:victim"
88
+
89
+
bean := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean","createdAt":"2025-01-01T00:00:00Z"}`)
90
+
assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.bean", "b1", "cid1", bean, time.Now().Unix()))
91
+
92
+
pw := &ProfileWatcher{
93
+
index: idx,
94
+
watchedDIDs: map[string]struct{}{target: {}},
95
+
stopCh: make(chan struct{}),
96
+
}
97
+
98
+
event := []byte(`{
99
+
"did":"did:plc:victim",
100
+
"time_us":1700000000000000,
101
+
"kind":"account",
102
+
"account":{"active":false,"did":"did:plc:victim","seq":1,"status":"takendown","time":"2025-01-03T00:00:00Z"}
103
+
}`)
104
+
pw.processMessage(event)
105
+
106
+
assert.Equal(t, 0, idx.RecordCount(), "takendown accounts should be purged")
107
+
}
History
1 round
0 comments
pdewey.com
submitted
#0
1 commit
expand
collapse
feat: handle account delete and identity update events
merge conflicts detected
expand
collapse
expand
collapse
- internal/firehose/consumer.go:27
- internal/firehose/index.go:459
- internal/firehose/index_test.go:609
- internal/firehose/profile_watcher.go:68