Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: handle account delete and identity update events #16

open opened by pdewey.com targeting main from push-qtmrwnrptzrz
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:hm5f3dnm6jdhrc55qp2npdja/sh.tangled.repo.pull/3ml7z32my4z22
+328 -10
Diff #0
+25 -4
internal/firehose/consumer.go
··· 27 27 CID string `json:"cid"` 28 28 } 29 29 30 + // JetstreamIdentity represents the identity payload of a Jetstream event. 31 + // Emitted when a DID's handle or PDS endpoint changes. 32 + type JetstreamIdentity struct { 33 + DID string `json:"did"` 34 + Handle string `json:"handle,omitempty"` 35 + Seq int64 `json:"seq"` 36 + Time string `json:"time"` 37 + } 38 + 39 + // JetstreamAccount represents the account payload of a Jetstream event. 40 + // Status is one of "active", "deleted", "deactivated", "suspended", "takendown". 41 + type JetstreamAccount struct { 42 + Active bool `json:"active"` 43 + DID string `json:"did"` 44 + Seq int64 `json:"seq"` 45 + Status string `json:"status,omitempty"` 46 + Time string `json:"time"` 47 + } 48 + 30 49 // JetstreamEvent represents an event from Jetstream 31 50 type JetstreamEvent struct { 32 - DID string `json:"did"` 33 - TimeUS int64 `json:"time_us"` 34 - Kind string `json:"kind"` // "commit", "identity", "account" 35 - Commit *JetstreamCommit `json:"commit,omitempty"` 51 + DID string `json:"did"` 52 + TimeUS int64 `json:"time_us"` 53 + Kind string `json:"kind"` // "commit", "identity", "account" 54 + Commit *JetstreamCommit `json:"commit,omitempty"` 55 + Identity *JetstreamIdentity `json:"identity,omitempty"` 56 + Account *JetstreamAccount `json:"account,omitempty"` 36 57 } 37 58 38 59 // Consumer consumes events from Jetstream and indexes them
+54
internal/firehose/index.go
··· 459 459 return err 460 460 } 461 461 462 + // DeleteAllByDID removes all data associated with a DID from the index. 463 + // Used when a Jetstream account event reports the DID as deleted or takendown. 464 + // 465 + // Removes: records authored by the DID; likes/comments by the DID; likes/comments 466 + // targeting the DID's records; profile cache; notifications to or from the DID; 467 + // known/registered/backfilled tracking; user settings. 468 + // 469 + // Preserves moderation_* tables (reports, audit log, blacklist, labels, hidden 470 + // records, autohide resets) โ€” those are evidence of moderation actions and 471 + // should outlive the account. 472 + func (idx *FeedIndex) DeleteAllByDID(ctx context.Context, did string) error { 473 + uriPrefix := fmt.Sprintf("at://%s/%%", did) 474 + 475 + tx, err := idx.db.BeginTx(ctx, nil) 476 + if err != nil { 477 + return fmt.Errorf("begin tx: %w", err) 478 + } 479 + defer tx.Rollback() //nolint:errcheck 480 + 481 + stmts := []struct { 482 + sql string 483 + args []any 484 + }{ 485 + {`DELETE FROM records WHERE did = ?`, []any{did}}, 486 + {`DELETE FROM likes WHERE actor_did = ?`, []any{did}}, 487 + {`DELETE FROM likes WHERE subject_uri LIKE ?`, []any{uriPrefix}}, 488 + {`DELETE FROM comments WHERE actor_did = ?`, []any{did}}, 489 + {`DELETE FROM comments WHERE subject_uri LIKE ?`, []any{uriPrefix}}, 490 + {`DELETE FROM notifications WHERE target_did = ? OR actor_did = ?`, []any{did, did}}, 491 + {`DELETE FROM notifications_meta WHERE target_did = ?`, []any{did}}, 492 + {`DELETE FROM profiles WHERE did = ?`, []any{did}}, 493 + {`DELETE FROM known_dids WHERE did = ?`, []any{did}}, 494 + {`DELETE FROM registered_dids WHERE did = ?`, []any{did}}, 495 + {`DELETE FROM backfilled WHERE did = ?`, []any{did}}, 496 + {`DELETE FROM user_settings WHERE did = ?`, []any{did}}, 497 + } 498 + 499 + for _, s := range stmts { 500 + if _, err := tx.ExecContext(ctx, s.sql, s.args...); err != nil { 501 + return fmt.Errorf("delete by did: %w", err) 502 + } 503 + } 504 + 505 + if err := tx.Commit(); err != nil { 506 + return fmt.Errorf("commit: %w", err) 507 + } 508 + 509 + idx.profileCacheMu.Lock() 510 + delete(idx.profileCache, did) 511 + idx.profileCacheMu.Unlock() 512 + 513 + return nil 514 + } 515 + 462 516 // UpsertWitnessRecord implements atproto.WitnessCache for write-through caching. 463 517 func (idx *FeedIndex) UpsertWitnessRecord(ctx context.Context, did, collection, rkey, cid string, record json.RawMessage) error { 464 518 return idx.UpsertRecord(ctx, did, collection, rkey, cid, record, time.Now().UnixMicro())
+86
internal/firehose/index_test.go
··· 609 609 610 610 assert.Equal(t, 0, idx.RecordCount(), "all records should be deleted") 611 611 } 612 + 613 + func TestDeleteAllByDID(t *testing.T) { 614 + tmpDir := t.TempDir() 615 + idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour) 616 + assert.NoError(t, err) 617 + defer idx.Close() 618 + 619 + ctx := context.Background() 620 + target := "did:plc:victim" 621 + other := "did:plc:bystander" 622 + now := time.Now().Unix() 623 + 624 + // Records: target owns 2, other owns 1 625 + bean := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean","createdAt":"2025-01-01T00:00:00Z"}`) 626 + brew := []byte(`{"$type":"social.arabica.alpha.brew","createdAt":"2025-01-02T00:00:00Z"}`) 627 + assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.bean", "b1", "cid1", bean, now)) 628 + assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.brew", "br1", "cid2", brew, now)) 629 + assert.NoError(t, idx.UpsertRecord(ctx, other, "social.arabica.alpha.bean", "b2", "cid3", bean, now)) 630 + 631 + // Likes: target -> other's record, other -> target's record, other -> other's record 632 + targetBeanURI := "at://" + target + "/social.arabica.alpha.bean/b1" 633 + otherBeanURI := "at://" + other + "/social.arabica.alpha.bean/b2" 634 + assert.NoError(t, idx.UpsertLike(ctx, target, "lk1", otherBeanURI)) 635 + assert.NoError(t, idx.UpsertLike(ctx, other, "lk2", targetBeanURI)) 636 + assert.NoError(t, idx.UpsertLike(ctx, other, "lk3", otherBeanURI)) 637 + 638 + // Comments mirroring the like pattern 639 + createdAt := time.Now() 640 + assert.NoError(t, idx.UpsertComment(ctx, target, "c1", otherBeanURI, "", "cidc1", "by target", createdAt)) 641 + assert.NoError(t, idx.UpsertComment(ctx, other, "c2", targetBeanURI, "", "cidc2", "on target", createdAt)) 642 + assert.NoError(t, idx.UpsertComment(ctx, other, "c3", otherBeanURI, "", "cidc3", "untouched", createdAt)) 643 + 644 + // Backfill marker for target 645 + assert.NoError(t, idx.MarkBackfilled(ctx, target)) 646 + assert.True(t, idx.IsBackfilled(ctx, target)) 647 + 648 + // Settings for target 649 + assert.NoError(t, idx.SetProfileStatsVisibility(ctx, target, models.ProfileStatsVisibility{})) 650 + 651 + // Notification: target receives a like from other 652 + idx.CreateLikeNotification(other, targetBeanURI) 653 + 654 + // Sanity: counts before 655 + assert.Equal(t, 3, idx.RecordCount()) 656 + assert.Equal(t, 3, idx.TotalLikeCount()) 657 + assert.Equal(t, 3, idx.TotalCommentCount()) 658 + 659 + // Act 660 + assert.NoError(t, idx.DeleteAllByDID(ctx, target)) 661 + 662 + // Records: only other's bean remains 663 + assert.Equal(t, 1, idx.RecordCount()) 664 + rec, err := idx.GetRecord(ctx, otherBeanURI) 665 + assert.NoError(t, err) 666 + assert.NotNil(t, rec) 667 + 668 + // Likes: only other->other survives (target's like and the like ON target's record gone) 669 + assert.Equal(t, 1, idx.TotalLikeCount()) 670 + assert.Equal(t, 0, idx.GetLikeCount(ctx, targetBeanURI)) 671 + assert.Equal(t, 1, idx.GetLikeCount(ctx, otherBeanURI)) 672 + assert.False(t, idx.HasUserLiked(ctx, target, otherBeanURI)) 673 + 674 + // Comments: only the third comment survives 675 + assert.Equal(t, 1, idx.TotalCommentCount()) 676 + assert.Equal(t, 0, idx.GetCommentCount(ctx, targetBeanURI)) 677 + assert.Equal(t, 1, idx.GetCommentCount(ctx, otherBeanURI)) 678 + 679 + // Backfill cleared 680 + assert.False(t, idx.IsBackfilled(ctx, target)) 681 + 682 + // Profile cache cleared (in-memory) 683 + idx.profileCacheMu.RLock() 684 + _, present := idx.profileCache[target] 685 + idx.profileCacheMu.RUnlock() 686 + assert.False(t, present) 687 + } 688 + 689 + func TestDeleteAllByDID_NoData(t *testing.T) { 690 + tmpDir := t.TempDir() 691 + idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour) 692 + assert.NoError(t, err) 693 + defer idx.Close() 694 + 695 + // Should be a no-op for an unknown DID 696 + assert.NoError(t, idx.DeleteAllByDID(context.Background(), "did:plc:ghost")) 697 + }
+56 -6
internal/firehose/profile_watcher.go
··· 68 68 } 69 69 } 70 70 71 + // Unwatch removes a DID from the subscription. Used when an account is deleted 72 + // so Jetstream stops sending events for that DID. 73 + func (pw *ProfileWatcher) Unwatch(did string) { 74 + pw.watchedDIDsMu.Lock() 75 + _, present := pw.watchedDIDs[did] 76 + delete(pw.watchedDIDs, did) 77 + pw.watchedDIDsMu.Unlock() 78 + 79 + if present { 80 + pw.sendOptionsUpdate() 81 + } 82 + } 83 + 71 84 // Start begins the profile watcher in a background goroutine. It will reconnect 72 85 // automatically on failure, rotating through endpoints with exponential backoff. 73 86 func (pw *ProfileWatcher) Start(ctx context.Context) { ··· 269 282 270 283 func (pw *ProfileWatcher) processMessage(data []byte) { 271 284 var event JetstreamEvent 272 - if err := json.Unmarshal(data, &event); err != nil || event.Kind != "commit" || event.Commit == nil { 273 - return 274 - } 275 - if event.Commit.Collection != NSIDBlueskyProfile { 285 + if err := json.Unmarshal(data, &event); err != nil { 276 286 return 277 287 } 278 - if event.Commit.Operation == "create" || event.Commit.Operation == "update" { 288 + 289 + switch event.Kind { 290 + case "commit": 291 + if event.Commit == nil || event.Commit.Collection != NSIDBlueskyProfile { 292 + return 293 + } 294 + if event.Commit.Operation == "create" || event.Commit.Operation == "update" { 295 + pw.index.RefreshProfile(context.Background(), event.DID) 296 + log.Debug().Str("did", event.DID).Msg("profile watcher: refreshed profile cache") 297 + } 298 + 299 + case "identity": 300 + // Handle change or PDS migration โ€” refresh the cached profile so handle 301 + // resolution stays accurate. Profile-commit events don't fire on handle 302 + // changes, so this is the only signal we get. 279 303 pw.index.RefreshProfile(context.Background(), event.DID) 280 - log.Debug().Str("did", event.DID).Msg("profile watcher: refreshed profile cache") 304 + handle := "" 305 + if event.Identity != nil { 306 + handle = event.Identity.Handle 307 + } 308 + log.Info().Str("did", event.DID).Str("handle", handle).Msg("profile watcher: identity update, refreshed profile") 309 + 310 + case "account": 311 + if event.Account == nil { 312 + return 313 + } 314 + status := event.Account.Status 315 + log.Info(). 316 + Str("did", event.DID). 317 + Str("status", status). 318 + Bool("active", event.Account.Active). 319 + Msg("profile watcher: account event") 320 + 321 + // Only act on terminal states. deactivated/suspended are reversible โ€” 322 + // we keep the data so it reappears if the account comes back. 323 + if status == "deleted" || status == "takendown" { 324 + if err := pw.index.DeleteAllByDID(context.Background(), event.DID); err != nil { 325 + log.Error().Err(err).Str("did", event.DID).Str("status", status).Msg("profile watcher: failed to delete user data") 326 + return 327 + } 328 + log.Warn().Str("did", event.DID).Str("status", status).Msg("profile watcher: purged all data for account") 329 + pw.Unwatch(event.DID) 330 + } 281 331 } 282 332 }
+107
internal/firehose/profile_watcher_test.go
··· 1 + package firehose 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + "time" 7 + 8 + "github.com/stretchr/testify/assert" 9 + ) 10 + 11 + func TestProfileWatcher_AccountDeleted_PurgesData(t *testing.T) { 12 + tmpDir := t.TempDir() 13 + idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour) 14 + assert.NoError(t, err) 15 + defer idx.Close() 16 + 17 + ctx := context.Background() 18 + target := "did:plc:victim" 19 + 20 + bean := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean","createdAt":"2025-01-01T00:00:00Z"}`) 21 + assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.bean", "b1", "cid1", bean, time.Now().Unix())) 22 + assert.Equal(t, 1, idx.RecordCount()) 23 + 24 + pw := &ProfileWatcher{ 25 + index: idx, 26 + watchedDIDs: map[string]struct{}{target: {}}, 27 + stopCh: make(chan struct{}), 28 + } 29 + 30 + event := []byte(`{ 31 + "did":"did:plc:victim", 32 + "time_us":1700000000000000, 33 + "kind":"account", 34 + "account":{"active":false,"did":"did:plc:victim","seq":1,"status":"deleted","time":"2025-01-03T00:00:00Z"} 35 + }`) 36 + pw.processMessage(event) 37 + 38 + assert.Equal(t, 0, idx.RecordCount(), "records should be purged on account deletion") 39 + 40 + pw.watchedDIDsMu.RLock() 41 + _, present := pw.watchedDIDs[target] 42 + pw.watchedDIDsMu.RUnlock() 43 + assert.False(t, present, "DID should be unwatched after deletion") 44 + } 45 + 46 + func TestProfileWatcher_AccountDeactivated_KeepsData(t *testing.T) { 47 + tmpDir := t.TempDir() 48 + idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour) 49 + assert.NoError(t, err) 50 + defer idx.Close() 51 + 52 + ctx := context.Background() 53 + target := "did:plc:victim" 54 + 55 + bean := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean","createdAt":"2025-01-01T00:00:00Z"}`) 56 + assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.bean", "b1", "cid1", bean, time.Now().Unix())) 57 + 58 + pw := &ProfileWatcher{ 59 + index: idx, 60 + watchedDIDs: map[string]struct{}{target: {}}, 61 + stopCh: make(chan struct{}), 62 + } 63 + 64 + event := []byte(`{ 65 + "did":"did:plc:victim", 66 + "time_us":1700000000000000, 67 + "kind":"account", 68 + "account":{"active":false,"did":"did:plc:victim","seq":1,"status":"deactivated","time":"2025-01-03T00:00:00Z"} 69 + }`) 70 + pw.processMessage(event) 71 + 72 + assert.Equal(t, 1, idx.RecordCount(), "deactivated accounts are reversible โ€” keep data") 73 + 74 + pw.watchedDIDsMu.RLock() 75 + _, present := pw.watchedDIDs[target] 76 + pw.watchedDIDsMu.RUnlock() 77 + assert.True(t, present, "DID should remain watched on reversible status changes") 78 + } 79 + 80 + func TestProfileWatcher_AccountTakendown_PurgesData(t *testing.T) { 81 + tmpDir := t.TempDir() 82 + idx, err := NewFeedIndex(tmpDir+"/test.db", 1*time.Hour) 83 + assert.NoError(t, err) 84 + defer idx.Close() 85 + 86 + ctx := context.Background() 87 + target := "did:plc:victim" 88 + 89 + bean := []byte(`{"$type":"social.arabica.alpha.bean","name":"Bean","createdAt":"2025-01-01T00:00:00Z"}`) 90 + assert.NoError(t, idx.UpsertRecord(ctx, target, "social.arabica.alpha.bean", "b1", "cid1", bean, time.Now().Unix())) 91 + 92 + pw := &ProfileWatcher{ 93 + index: idx, 94 + watchedDIDs: map[string]struct{}{target: {}}, 95 + stopCh: make(chan struct{}), 96 + } 97 + 98 + event := []byte(`{ 99 + "did":"did:plc:victim", 100 + "time_us":1700000000000000, 101 + "kind":"account", 102 + "account":{"active":false,"did":"did:plc:victim","seq":1,"status":"takendown","time":"2025-01-03T00:00:00Z"} 103 + }`) 104 + pw.processMessage(event) 105 + 106 + assert.Equal(t, 0, idx.RecordCount(), "takendown accounts should be purged") 107 + }

History

1 round 0 comments
sign up or login to add to the discussion
pdewey.com submitted #0
1 commit
expand
feat: handle account delete and identity update events
merge conflicts detected
expand
  • internal/firehose/consumer.go:27
  • internal/firehose/index.go:459
  • internal/firehose/index_test.go:609
  • internal/firehose/profile_watcher.go:68
expand 0 comments