A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Allow re-updates for existing subscriptions

+165 -54
+71 -42
internal/atproto/stream_handler.go
··· 121 121 func (h *StreamDBHandler) handleAnnotation(ctx context.Context, event *Event) error { 122 122 switch event.Type { 123 123 case actionCreate: 124 - var rec AnnotationRecord 125 - if err := json.Unmarshal(event.Value, &rec); err != nil { 124 + a, err := parseAnnotationRecord(event) 125 + if err != nil || a == nil { 126 126 return err 127 127 } 128 - if rec.FeedURL == "" || rec.ArticleURL == "" { 129 - return nil 130 - } 128 + return h.articles.CreateAnnotation(ctx, a) 131 129 132 - t, _ := time.Parse(time.RFC3339, rec.CreatedAt) 133 - a := &db.Annotation{ 134 - URI: event.URI, 135 - AuthorDID: event.DID, 136 - FeedURL: rec.FeedURL, 137 - ArticleURL: rec.ArticleURL, 138 - Quote: db.NullStr(rec.Quote), 139 - Note: db.NullStr(rec.Note), 140 - Tags: db.NullStrTags(rec.Tags), 141 - CreatedAt: sql.NullTime{Time: t, Valid: true}, 142 - CID: sql.NullString{String: event.CID, Valid: event.CID != ""}, 130 + case actionUpdate: 131 + a, err := parseAnnotationRecord(event) 132 + if err != nil || a == nil { 133 + return err 143 134 } 144 - if rec.Rating > 0 { 145 - a.Rating = sql.NullInt64{Int64: int64(rec.Rating), Valid: true} 146 - } 147 - return h.articles.CreateAnnotation(ctx, a) 135 + return h.articles.UpdateAnnotation(ctx, a) 148 136 149 137 case actionDelete: 150 138 return h.articles.DeleteAnnotation(ctx, event.URI) ··· 152 140 return nil 153 141 } 154 142 143 + func parseAnnotationRecord(event *Event) (*db.Annotation, error) { 144 + var rec AnnotationRecord 145 + if err := json.Unmarshal(event.Value, &rec); err != nil { 146 + return nil, err 147 + } 148 + if rec.FeedURL == "" || rec.ArticleURL == "" { 149 + return nil, nil 150 + } 151 + 152 + t, _ := time.Parse(time.RFC3339, rec.CreatedAt) 153 + a := &db.Annotation{ 154 + URI: event.URI, 155 + AuthorDID: event.DID, 156 + FeedURL: rec.FeedURL, 157 + ArticleURL: rec.ArticleURL, 158 + Quote: db.NullStr(rec.Quote), 159 + Note: db.NullStr(rec.Note), 160 + Tags: db.NullStrTags(rec.Tags), 161 + CreatedAt: sql.NullTime{Time: t, Valid: true}, 162 + CID: sql.NullString{String: event.CID, Valid: event.CID != ""}, 163 + } 164 + if rec.Rating > 0 { 165 + a.Rating = sql.NullInt64{Int64: int64(rec.Rating), Valid: true} 166 + } 167 + return a, nil 168 + } 169 + 155 170 func (h *StreamDBHandler) handleFollow(ctx context.Context, event *Event) error { 156 171 switch event.Type { 157 172 case actionCreate: ··· 172 187 173 188 func (h *StreamDBHandler) handleMarginNote(ctx context.Context, event *Event) error { 174 189 switch event.Type { 175 - case actionCreate, actionUpdate: 176 - var rec MarginNoteRecord 177 - if err := json.Unmarshal(event.Value, &rec); err != nil { 190 + case actionCreate: 191 + a, err := h.parseMarginNoteRecord(ctx, event) 192 + if err != nil || a == nil { 178 193 return err 179 194 } 180 - 181 - articleURL, quote, note, tags := rec.ToAnnotation() 182 - if articleURL == "" { 183 - return nil 184 - } 195 + return h.articles.CreateAnnotation(ctx, a) 185 196 186 - feedURL := h.resolveFeedURL(ctx, articleURL) 187 - 188 - t, _ := time.Parse(time.RFC3339, rec.CreatedAt) 189 - a := &db.Annotation{ 190 - URI: event.URI, 191 - AuthorDID: event.DID, 192 - FeedURL: feedURL, 193 - ArticleURL: articleURL, 194 - Quote: db.NullStr(quote), 195 - Note: db.NullStr(note), 196 - Tags: db.NullStrTags(tags), 197 - CreatedAt: sql.NullTime{Time: t, Valid: true}, 198 - CID: sql.NullString{String: event.CID, Valid: event.CID != ""}, 197 + case actionUpdate: 198 + a, err := h.parseMarginNoteRecord(ctx, event) 199 + if err != nil || a == nil { 200 + return err 199 201 } 200 - return h.articles.CreateAnnotation(ctx, a) 202 + return h.articles.UpdateAnnotation(ctx, a) 201 203 202 204 case actionDelete: 203 205 } 204 206 return nil 207 + } 208 + 209 + func (h *StreamDBHandler) parseMarginNoteRecord(ctx context.Context, event *Event) (*db.Annotation, error) { 210 + var rec MarginNoteRecord 211 + if err := json.Unmarshal(event.Value, &rec); err != nil { 212 + return nil, err 213 + } 214 + 215 + articleURL, quote, note, tags := rec.ToAnnotation() 216 + if articleURL == "" { 217 + return nil, nil 218 + } 219 + 220 + feedURL := h.resolveFeedURL(ctx, articleURL) 221 + 222 + t, _ := time.Parse(time.RFC3339, rec.CreatedAt) 223 + return &db.Annotation{ 224 + URI: event.URI, 225 + AuthorDID: event.DID, 226 + FeedURL: feedURL, 227 + ArticleURL: articleURL, 228 + Quote: db.NullStr(quote), 229 + Note: db.NullStr(note), 230 + Tags: db.NullStrTags(tags), 231 + CreatedAt: sql.NullTime{Time: t, Valid: true}, 232 + CID: sql.NullString{String: event.CID, Valid: event.CID != ""}, 233 + }, nil 205 234 } 206 235 207 236 func (h *StreamDBHandler) handleSkyreaderSubscription(ctx context.Context, event *Event) error {
+57
internal/db/batch_test.go
··· 162 162 assert.Equal(t, s.URI.String, "at://existing") 163 163 } 164 164 165 + func TestBatchReconcileSubscriptions_UpdatesCategoryAndTitle(t *testing.T) { 166 + ctx := context.Background() 167 + dbs := setupTestDB(t) 168 + userDID := seedSubscriptionData(t, ctx, dbs) 169 + 170 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml", Title: NullStr("Feed A")}) 171 + err := dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "Feed A", "old-cat", "at://existing", "cid") 172 + assert.NilError(t, err) 173 + 174 + subs := []SubData{ 175 + {FeedURL: "https://a.com/feed.xml", Title: "New Title", Category: "new-cat", URI: "at://existing", CID: "cid"}, 176 + } 177 + err = dbs.Articles.BatchReconcileSubscriptions(ctx, userDID, subs) 178 + assert.NilError(t, err) 179 + 180 + s, err := dbs.Articles.GetSubscription(ctx, userDID, "https://a.com/feed.xml") 181 + assert.NilError(t, err) 182 + assert.Equal(t, s.Category.String, "new-cat") 183 + assert.Equal(t, s.FeedTitle, "New Title") 184 + 185 + f, err := dbs.Articles.GetFeed(ctx, "https://a.com/feed.xml") 186 + assert.NilError(t, err) 187 + assert.Equal(t, f.SubscriberCount, 1) 188 + } 189 + 190 + func TestCreateSubscription_UpdatesCategoryAndTitle(t *testing.T) { 191 + ctx := context.Background() 192 + dbs := setupTestDB(t) 193 + userDID := seedSubscriptionData(t, ctx, dbs) 194 + 195 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml", Title: NullStr("Feed A")}) 196 + err := dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "Feed A", "old-cat", "at://existing", "cid") 197 + assert.NilError(t, err) 198 + 199 + err = dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "New Title", "new-cat", "at://existing", "cid2") 200 + assert.NilError(t, err) 201 + 202 + s, err := dbs.Articles.GetSubscription(ctx, userDID, "https://a.com/feed.xml") 203 + assert.NilError(t, err) 204 + assert.Equal(t, s.Category.String, "new-cat") 205 + assert.Equal(t, s.FeedTitle, "New Title") 206 + assert.Equal(t, s.CID.String, "cid2") 207 + } 208 + 209 + func TestCreateSubscription_ReturnsDuplicateWhenUnchanged(t *testing.T) { 210 + ctx := context.Background() 211 + dbs := setupTestDB(t) 212 + userDID := seedSubscriptionData(t, ctx, dbs) 213 + 214 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml", Title: NullStr("Feed A")}) 215 + err := dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "Feed A", "cat", "at://existing", "cid") 216 + assert.NilError(t, err) 217 + 218 + err = dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "Feed A", "cat", "at://existing", "cid") 219 + assert.Equal(t, err, ErrDuplicateSubscription) 220 + } 221 + 165 222 func TestBatchCreateLikes_InsertsAll(t *testing.T) { 166 223 ctx := context.Background() 167 224 dbs := setupTestDB(t)
+29 -12
internal/db/feed.go
··· 121 121 122 122 func (s *ArticleStore) CreateSubscription(ctx context.Context, userDID, feedURL, title, category, uri, cid string) error { 123 123 existing, err := s.GetSubscription(ctx, userDID, feedURL) 124 - if err == nil && existing != nil { 125 - if !existing.URI.Valid || existing.URI.String == "" { 126 - return s.updateSubscriptionURI(ctx, userDID, feedURL, uri, cid) 127 - } 124 + if err != nil || existing == nil { 125 + return s.BatchReconcileSubscriptions(ctx, userDID, []SubData{{FeedURL: feedURL, Title: title, Category: category, URI: uri, CID: cid}}) 126 + } 127 + 128 + unchanged := existing.FeedTitle == title && existing.Category.String == category && existing.CID.String == cid 129 + if unchanged { 128 130 return ErrDuplicateSubscription 129 131 } 130 - return s.BatchReconcileSubscriptions(ctx, userDID, []SubData{{FeedURL: feedURL, Title: title, Category: category, URI: uri, CID: cid}}) 131 - } 132 132 133 - func (s *ArticleStore) updateSubscriptionURI(ctx context.Context, userDID, feedURL, uri, cid string) error { 134 - _, err := s.db.ExecContext(ctx, ` 135 - UPDATE articles.subscriptions SET uri = ?, cid = ? WHERE user_did = ? AND feed_url = ? 136 - `, uri, cid, userDID, feedURL) 133 + if !existing.URI.Valid || existing.URI.String == "" { 134 + _, err := s.db.ExecContext(ctx, ` 135 + UPDATE articles.subscriptions SET title = ?, category = ?, uri = ?, cid = ? WHERE user_did = ? AND feed_url = ? 136 + `, nilIfEmpty(title), nilIfEmpty(category), nilIfEmpty(uri), nilIfEmpty(cid), userDID, feedURL) 137 + return err 138 + } 139 + _, err = s.db.ExecContext(ctx, ` 140 + UPDATE articles.subscriptions SET title = ?, category = ?, cid = ? WHERE user_did = ? AND feed_url = ? 141 + `, nilIfEmpty(title), nilIfEmpty(category), nilIfEmpty(cid), userDID, feedURL) 137 142 return err 138 143 } 139 144 ··· 411 416 } 412 417 defer insertStmt.Close() 413 418 419 + backfillStmt, err := tx.PrepareContext(ctx, ` 420 + UPDATE articles.subscriptions SET title = ?, category = ?, uri = ?, cid = ? WHERE user_did = ? AND feed_url = ? 421 + `) 422 + if err != nil { 423 + return err 424 + } 425 + defer backfillStmt.Close() 426 + 414 427 updateStmt, err := tx.PrepareContext(ctx, ` 415 - UPDATE articles.subscriptions SET uri = ?, cid = ? WHERE user_did = ? AND feed_url = ? 428 + UPDATE articles.subscriptions SET title = ?, category = ?, cid = ? WHERE user_did = ? AND feed_url = ? 416 429 `) 417 430 if err != nil { 418 431 return err ··· 428 441 for _, sub := range subs { 429 442 if existingURI, ok := existing[sub.FeedURL]; ok { 430 443 if existingURI == "" && sub.URI != "" { 431 - if _, err := updateStmt.ExecContext(ctx, sub.URI, sub.CID, userDID, sub.FeedURL); err != nil { 444 + if _, err := backfillStmt.ExecContext(ctx, nilIfEmpty(sub.Title), nilIfEmpty(sub.Category), sub.URI, nilIfEmpty(sub.CID), userDID, sub.FeedURL); err != nil { 445 + return err 446 + } 447 + } else if existingURI == sub.URI { 448 + if _, err := updateStmt.ExecContext(ctx, nilIfEmpty(sub.Title), nilIfEmpty(sub.Category), nilIfEmpty(sub.CID), userDID, sub.FeedURL); err != nil { 432 449 return err 433 450 } 434 451 }
+8
internal/db/social.go
··· 39 39 return s.BatchCreateAnnotations(ctx, []*Annotation{a}) 40 40 } 41 41 42 + func (s *ArticleStore) UpdateAnnotation(ctx context.Context, a *Annotation) error { 43 + _, err := s.db.ExecContext(ctx, ` 44 + UPDATE articles.annotations SET quote = ?, note = ?, tags = ?, rating = ?, cid = ? 45 + WHERE uri = ? 46 + `, a.Quote, a.Note, a.Tags, a.Rating, a.CID, a.URI) 47 + return err 48 + } 49 + 42 50 func (s *ArticleStore) GetAnnotation(ctx context.Context, id int64) (*Annotation, error) { 43 51 a := &Annotation{} 44 52 err := s.db.QueryRowContext(ctx, `