···202202 return h.articles.UpdateAnnotation(ctx, a)
203203204204 case actionDelete:
205205+ // Margin notes are converted to annotations; their URI is tracked
206206+ // by sync so orphan cleanup handles deletion during backfill.
205207 }
206208 return nil
207209}
···252254 return err
253255254256 case actionDelete:
257257+ // Skyreader subscriptions are imported into glean subscriptions;
258258+ // sync's orphan cleanup handles deletion during backfill.
255259 }
256260 return nil
257261}
+73-70
internal/atproto/sync.go
···3333func (s *Sync) Run(ctx context.Context, userDID string) error {
3434 s.logger.Info("syncing from PDS", "did", userDID)
35353636- if err := s.syncCollection(ctx, userDID, CollectionSubscription, s.batchReconcileSubscriptions); err != nil {
3636+ if err := s.syncSubscriptions(ctx, userDID); err != nil {
3737 s.logger.Error("sync subscriptions failed", "error", err, "did", userDID)
3838 }
3939- if err := s.syncCollection(ctx, userDID, CollectionSkyreaderSubscription, s.batchReconcileSkyreaderSubscriptions); err != nil {
4040- s.logger.Error("sync skyreader subscriptions failed", "error", err, "did", userDID)
4141- }
4242- if err := s.syncCollection(ctx, userDID, CollectionLike, s.batchReconcileLikes); err != nil {
3939+ if err := s.syncLikes(ctx, userDID); err != nil {
4340 s.logger.Error("sync likes failed", "error", err, "did", userDID)
4441 }
4545- if err := s.syncCollection(ctx, userDID, CollectionAnnotation, s.batchReconcileAnnotations); err != nil {
4242+ if err := s.syncAnnotations(ctx, userDID); err != nil {
4643 s.logger.Error("sync annotations failed", "error", err, "did", userDID)
4744 }
4848- if err := s.syncCollection(ctx, userDID, CollectionMarginNote, s.batchReconcileMarginNotes); err != nil {
4949- s.logger.Error("sync margin notes failed", "error", err, "did", userDID)
5050- }
5145 if err := s.syncFollows(ctx, userDID); err != nil {
5246 s.logger.Error("sync follows failed", "error", err, "did", userDID)
5347 }
···5549 return nil
5650}
57515858-func (s *Sync) syncCollection(ctx context.Context, userDID, collection string, fn func(ctx context.Context, userDID string, records []Record) error) error {
5252+func (s *Sync) listRecords(ctx context.Context, userDID, collection string) ([]Record, error) {
5953 var allRecords []Record
6054 cursor := ""
6155 for {
6256 records, next, err := s.client.ListRecords(ctx, userDID, collection, 100, cursor)
6357 if err != nil {
6464- return err
5858+ return nil, err
6559 }
6660 allRecords = append(allRecords, records...)
6761 if next == "" || len(records) == 0 {
···6963 }
7064 cursor = next
7165 }
7272- if len(allRecords) == 0 {
7373- return nil
7474- }
7575- return fn(ctx, userDID, allRecords)
6666+ return allRecords, nil
7667}
77687878-func (s *Sync) batchReconcileSubscriptions(ctx context.Context, userDID string, records []Record) error {
6969+func (s *Sync) syncSubscriptions(ctx context.Context, userDID string) error {
7070+ gleanRecs, err := s.listRecords(ctx, userDID, CollectionSubscription)
7171+ if err != nil {
7272+ return err
7373+ }
7474+ skyRecs, err := s.listRecords(ctx, userDID, CollectionSkyreaderSubscription)
7575+ if err != nil {
7676+ return err
7777+ }
7878+7979 var feeds []*db.Feed
8080 var subs []db.SubData
8181+ activeFeedURLs := make(map[string]bool)
81828282- for _, r := range records {
8383+ for _, r := range gleanRecs {
8384 var rec SubscriptionRecord
8485 if err := json.Unmarshal(r.Value, &rec); err != nil {
8586 continue
···8788 if rec.FeedURL == "" {
8889 continue
8990 }
9191+ activeFeedURLs[rec.FeedURL] = true
9092 feeds = append(feeds, &db.Feed{FeedURL: rec.FeedURL, Title: db.NullStr(rec.Title)})
9193 subs = append(subs, db.SubData{
9294 FeedURL: rec.FeedURL,
···9799 })
98100 }
99101100100- if len(feeds) > 0 {
101101- if err := s.articles.BatchUpsertFeeds(ctx, feeds); err != nil {
102102- return fmt.Errorf("upsert feeds: %w", err)
103103- }
104104- }
105105- return s.articles.BatchReconcileSubscriptions(ctx, userDID, subs)
106106-}
107107-108108-func (s *Sync) batchReconcileSkyreaderSubscriptions(ctx context.Context, userDID string, records []Record) error {
109109- var feeds []*db.Feed
110110- var subs []db.SubData
111111-112112- for _, r := range records {
102102+ for _, r := range skyRecs {
113103 var rec SkyreaderSubscriptionRecord
114104 if err := json.Unmarshal(r.Value, &rec); err != nil {
115105 continue
···117107 if rec.FeedURL == "" {
118108 continue
119109 }
110110+ activeFeedURLs[rec.FeedURL] = true
120111 feeds = append(feeds, &db.Feed{FeedURL: rec.FeedURL, Title: db.NullStr(rec.Title), SiteURL: db.NullStr(rec.SiteURL)})
121112 subs = append(subs, db.SubData{
122113 FeedURL: rec.FeedURL,
···128119129120 if len(feeds) > 0 {
130121 if err := s.articles.BatchUpsertFeeds(ctx, feeds); err != nil {
131131- return fmt.Errorf("failed to upsert feeds: %w", err)
122122+ return fmt.Errorf("upsert feeds: %w", err)
132123 }
133124 }
134134-135135- return s.articles.BatchReconcileSubscriptions(ctx, userDID, subs)
125125+ if err := s.articles.BatchReconcileSubscriptions(ctx, userDID, subs); err != nil {
126126+ return err
127127+ }
128128+ return s.articles.DeleteOrphanedSubscriptions(ctx, userDID, activeFeedURLs)
136129}
137130138138-func (s *Sync) batchReconcileLikes(ctx context.Context, userDID string, records []Record) error {
139139- var likes []*db.Like
131131+func (s *Sync) syncLikes(ctx context.Context, userDID string) error {
132132+ records, err := s.listRecords(ctx, userDID, CollectionLike)
133133+ if err != nil {
134134+ return err
135135+ }
140136137137+ var likes []*db.Like
138138+ activeURIs := make(map[string]bool)
141139 for _, r := range records {
142140 var rec LikeRecord
143141 if err := json.Unmarshal(r.Value, &rec); err != nil {
···146144 if rec.FeedURL == "" || rec.ArticleURL == "" {
147145 continue
148146 }
147147+ activeURIs[r.URI] = true
149148 t, _ := time.Parse(time.RFC3339, rec.CreatedAt)
150149 likes = append(likes, &db.Like{
151150 URI: r.URI,
···157156 })
158157 }
159158160160- return s.articles.BatchCreateLikes(ctx, likes)
159159+ if err := s.articles.BatchCreateLikes(ctx, likes); err != nil {
160160+ return err
161161+ }
162162+ return s.articles.DeleteOrphanedLikes(ctx, userDID, activeURIs)
161163}
162164163163-func (s *Sync) batchReconcileAnnotations(ctx context.Context, userDID string, records []Record) error {
165165+func (s *Sync) syncAnnotations(ctx context.Context, userDID string) error {
166166+ annRecs, err := s.listRecords(ctx, userDID, CollectionAnnotation)
167167+ if err != nil {
168168+ return err
169169+ }
170170+ marginRecs, err := s.listRecords(ctx, userDID, CollectionMarginNote)
171171+ if err != nil {
172172+ return err
173173+ }
174174+164175 var annotations []*db.Annotation
176176+ activeURIs := make(map[string]bool)
165177166166- for _, r := range records {
178178+ for _, r := range annRecs {
167179 var rec AnnotationRecord
168180 if err := json.Unmarshal(r.Value, &rec); err != nil {
169181 continue
···171183 if rec.FeedURL == "" || rec.ArticleURL == "" {
172184 continue
173185 }
186186+ activeURIs[r.URI] = true
174187 t, _ := time.Parse(time.RFC3339, rec.CreatedAt)
175188 a := &db.Annotation{
176189 URI: r.URI,
···189202 annotations = append(annotations, a)
190203 }
191204192192- return s.articles.BatchCreateAnnotations(ctx, annotations)
193193-}
194194-195195-func (s *Sync) batchReconcileMarginNotes(ctx context.Context, userDID string, records []Record) error {
196196- var annotations []*db.Annotation
197197-198198- for _, r := range records {
205205+ for _, r := range marginRecs {
199206 var rec MarginNoteRecord
200207 if err := json.Unmarshal(r.Value, &rec); err != nil {
201208 continue
···204211 if articleURL == "" {
205212 continue
206213 }
214214+ activeURIs[r.URI] = true
207215208216 feedURL := ""
209217 if article, err := s.articles.GetArticleByURL(ctx, articleURL); err == nil {
···224232 })
225233 }
226234227227- return s.articles.BatchCreateAnnotations(ctx, annotations)
235235+ if err := s.articles.BatchCreateAnnotations(ctx, annotations); err != nil {
236236+ return err
237237+ }
238238+ return s.articles.DeleteOrphanedAnnotations(ctx, userDID, activeURIs)
228239}
229240230241func (s *Sync) syncFollows(ctx context.Context, userDID string) error {
231242 activeFollows := make(map[string]db.Follow)
232243233244 for _, collection := range []string{CollectionBskyFollow, CollectionTangledFollow} {
234234- cursor := ""
235235- for {
236236- records, next, err := s.client.ListRecords(ctx, userDID, collection, 100, cursor)
237237- if err != nil {
238238- return err
245245+ records, err := s.listRecords(ctx, userDID, collection)
246246+ if err != nil {
247247+ return err
248248+ }
249249+250250+ for _, r := range records {
251251+ var rec FollowRecord
252252+ if err := json.Unmarshal(r.Value, &rec); err != nil {
253253+ continue
239254 }
240240-241241- for _, r := range records {
242242- var rec FollowRecord
243243- if err := json.Unmarshal(r.Value, &rec); err != nil {
244244- continue
245245- }
246246- if rec.Subject == "" {
247247- continue
248248- }
249249-250250- t, _ := time.Parse(time.RFC3339, rec.CreatedAt)
251251- activeFollows[rec.Subject] = db.Follow{
252252- URI: db.NullStr(r.URI),
253253- CID: db.NullStr(r.CID),
254254- FollowedAt: db.NullTime(t),
255255- }
255255+ if rec.Subject == "" {
256256+ continue
256257 }
257258258258- if next == "" || len(records) == 0 {
259259- break
259259+ t, _ := time.Parse(time.RFC3339, rec.CreatedAt)
260260+ activeFollows[rec.Subject] = db.Follow{
261261+ URI: db.NullStr(r.URI),
262262+ CID: db.NullStr(r.CID),
263263+ FollowedAt: db.NullTime(t),
260264 }
261261- cursor = next
262265 }
263266 }
264267
+1-4
internal/cluster/social.go
···1111func chunk[T any](s []T, size int) iter.Seq[[]T] {
1212 return func(yield func([]T) bool) {
1313 for i := 0; i < len(s); i += size {
1414- end := i + size
1515- if end > len(s) {
1616- end = len(s)
1717- }
1414+ end := min(i+size, len(s))
1815 if !yield(s[i:end]) {
1916 return
2017 }