A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Recalculate subscriber counts instead of incremental updates

+75 -100
+7 -7
internal/atproto/lexicon_external.go
··· 91 91 } 92 92 93 93 type StandardPublicationRecord struct { 94 - BasicTheme json.RawMessage `json:"basicTheme,omitempty"` 95 - Name string `json:"name"` 96 - URL string `json:"url"` 97 - Description string `json:"description"` 98 - Icon json.RawMessage `json:"icon,omitempty"` 99 - Labels json.RawMessage `json:"labels,omitempty"` 100 - Preferences json.RawMessage `json:"preferences,omitempty"` 94 + BasicTheme json.RawMessage `json:"basicTheme,omitempty"` 95 + Name string `json:"name"` 96 + URL string `json:"url"` 97 + Description string `json:"description"` 98 + Icon json.RawMessage `json:"icon,omitempty"` 99 + Labels json.RawMessage `json:"labels,omitempty"` 100 + Preferences json.RawMessage `json:"preferences,omitempty"` 101 101 } 102 102 103 103 type StandardDocumentRecord struct {
+6
internal/atproto/sync.go
··· 36 36 if err := s.syncSubscriptions(ctx, userDID); err != nil { 37 37 s.logger.Error("sync subscriptions failed", "error", err, "did", userDID) 38 38 } 39 + // Recompute subscriber_count from subscriptions table rather than 40 + // maintaining it incrementally (done here to avoid drift from race conditions 41 + // between stream handler events and sync operations). 42 + if err := s.articles.RecountSubscriberCounts(ctx); err != nil { 43 + s.logger.Error("recount subscriber counts failed", "error", err, "did", userDID) 44 + } 39 45 if err := s.syncLikes(ctx, userDID); err != nil { 40 46 s.logger.Error("sync likes failed", "error", err, "did", userDID) 41 47 }
+1 -6
internal/cluster/jaccard_test.go
··· 455 455 engine := newTestEngine(dbs) 456 456 assert.NilError(t, engine.ComputeFollowDistances(ctx)) 457 457 458 - _, err := dbs.SQLDB().ExecContext(ctx, `UPDATE articles.feeds SET subscriber_count = 2 WHERE feed_url = 'https://a.com/feed'`) 459 - assert.NilError(t, err) 460 - _, err = dbs.SQLDB().ExecContext(ctx, `UPDATE articles.feeds SET subscriber_count = 2 WHERE feed_url = 'https://b.com/feed'`) 461 - assert.NilError(t, err) 462 - 463 - _, err = dbs.SQLDB().ExecContext(ctx, `INSERT INTO users (did) VALUES (?)`, "did:test:newuser") 458 + _, err := dbs.SQLDB().ExecContext(ctx, `INSERT INTO users (did) VALUES (?)`, "did:test:newuser") 464 459 assert.NilError(t, err) 465 460 466 461 recs, err := engine.ColdStartRecommendations(ctx, "did:test:newuser", 10)
+46
internal/db/batch_test.go
··· 107 107 } 108 108 err := dbs.Articles.BatchReconcileSubscriptions(ctx, userDID, subs) 109 109 assert.NilError(t, err) 110 + assert.NilError(t, dbs.Articles.RecountSubscriberCounts(ctx)) 110 111 111 112 subs2, err := dbs.Articles.ListSubscriptions(ctx, userDID, "", 10, 0) 112 113 assert.NilError(t, err) ··· 132 133 } 133 134 err = dbs.Articles.BatchReconcileSubscriptions(ctx, userDID, subs) 134 135 assert.NilError(t, err) 136 + assert.NilError(t, dbs.Articles.RecountSubscriberCounts(ctx)) 135 137 136 138 s, err := dbs.Articles.GetSubscription(ctx, userDID, "https://a.com/feed.xml") 137 139 assert.NilError(t, err) ··· 176 178 } 177 179 err = dbs.Articles.BatchReconcileSubscriptions(ctx, userDID, subs) 178 180 assert.NilError(t, err) 181 + assert.NilError(t, dbs.Articles.RecountSubscriberCounts(ctx)) 179 182 180 183 s, err := dbs.Articles.GetSubscription(ctx, userDID, "https://a.com/feed.xml") 181 184 assert.NilError(t, err) ··· 315 318 "https://c.com/feed.xml": true, 316 319 }) 317 320 assert.NilError(t, err) 321 + assert.NilError(t, dbs.Articles.RecountSubscriberCounts(ctx)) 318 322 319 323 list, err := dbs.Articles.ListSubscriptions(ctx, userDID, "", 10, 0) 320 324 assert.NilError(t, err) ··· 486 490 exists, err = dbs.Articles.HasLiked(ctx, "did:test:u1", "https://a.com/feed", "https://a.com/2") 487 491 assert.NilError(t, err) 488 492 assert.Equal(t, exists, false) 493 + } 494 + 495 + func TestDeleteSubscription_NoDecrementWhenNotFound(t *testing.T) { 496 + ctx := context.Background() 497 + dbs := setupTestDB(t) 498 + userDID := seedSubscriptionData(t, ctx, dbs) 499 + 500 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml"}) 501 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "A", "", "at://uri", "cid")) 502 + 503 + assert.NilError(t, dbs.Articles.DeleteSubscription(ctx, userDID, "https://a.com/feed.xml")) 504 + 505 + assert.NilError(t, dbs.Articles.DeleteSubscription(ctx, userDID, "https://a.com/feed.xml")) 506 + } 507 + 508 + func TestRecountSubscriberCounts(t *testing.T) { 509 + ctx := context.Background() 510 + dbs := setupTestDB(t) 511 + user1 := seedSubscriptionData(t, ctx, dbs) 512 + 513 + _, err := dbs.SQLDB().ExecContext(ctx, `INSERT INTO users (did) VALUES (?)`, "did:test:u2") 514 + assert.NilError(t, err) 515 + 516 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml"}) 517 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://b.com/feed.xml"}) 518 + 519 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, user1, "https://a.com/feed.xml", "A", "", "", "")) 520 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, user1, "https://b.com/feed.xml", "B", "", "", "")) 521 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, "did:test:u2", "https://a.com/feed.xml", "A", "", "", "")) 522 + 523 + _, err = dbs.SQLDB().ExecContext(ctx, `UPDATE articles.feeds SET subscriber_count = 99 WHERE feed_url = 'https://a.com/feed.xml'`) 524 + assert.NilError(t, err) 525 + 526 + assert.NilError(t, dbs.Articles.RecountSubscriberCounts(ctx)) 527 + 528 + f, err := dbs.Articles.GetFeed(ctx, "https://a.com/feed.xml") 529 + assert.NilError(t, err) 530 + assert.Equal(t, f.SubscriberCount, 2) 531 + 532 + f, err = dbs.Articles.GetFeed(ctx, "https://b.com/feed.xml") 533 + assert.NilError(t, err) 534 + assert.Equal(t, f.SubscriberCount, 1) 489 535 } 490 536 491 537 func TestDeleteOrphanedAnnotations_RemovesOrphaned(t *testing.T) {
+15 -87
internal/db/feed.go
··· 4 4 "context" 5 5 "database/sql" 6 6 "errors" 7 - "strings" 8 7 "time" 9 8 10 9 "pkg.rbrt.fr/glean/internal/feed" ··· 112 111 return err 113 112 } 114 113 115 - func (s *ArticleStore) decrementSubscriberCount(ctx context.Context, feedURL string) error { 116 - _, err := s.db.ExecContext(ctx, ` 117 - UPDATE articles.feeds SET subscriber_count = MAX(subscriber_count - 1, 0) WHERE feed_url = ? 118 - `, feedURL) 119 - return err 120 - } 121 - 122 - func (s *ArticleStore) incrementSubscriberCount(ctx context.Context, feedURL string) error { 123 - _, err := s.db.ExecContext(ctx, ` 124 - UPDATE articles.feeds SET subscriber_count = subscriber_count + 1 WHERE feed_url = ? 125 - `, feedURL) 126 - return err 127 - } 128 - 129 114 func (s *ArticleStore) CreateSubscription(ctx context.Context, userDID, feedURL, title, category, uri, cid string) error { 130 115 existing, err := s.GetSubscription(ctx, userDID, feedURL) 131 116 if err != nil || existing == nil { 132 - result, err := s.db.ExecContext(ctx, ` 117 + _, err := s.db.ExecContext(ctx, ` 133 118 INSERT OR IGNORE INTO articles.subscriptions (user_did, feed_url, title, category, uri, cid) 134 119 VALUES (?, ?, ?, ?, ?, ?) 135 120 `, userDID, feedURL, nilIfEmpty(title), nilIfEmpty(category), nilIfEmpty(uri), nilIfEmpty(cid)) 136 - if err != nil { 137 - return err 138 - } 139 - if n, _ := result.RowsAffected(); n > 0 { 140 - return s.incrementSubscriberCount(ctx, feedURL) 141 - } 142 - return nil 121 + return err 143 122 } 144 123 145 124 unchanged := existing.FeedTitle == title && existing.Category.String == category && existing.CID.String == cid ··· 170 149 _, err := s.db.ExecContext(ctx, ` 171 150 DELETE FROM articles.subscriptions WHERE user_did = ? AND feed_url = ? 172 151 `, userDID, feedURL) 173 - if err != nil { 174 - return err 175 - } 176 - return s.decrementSubscriberCount(ctx, feedURL) 152 + return err 177 153 } 178 154 179 155 func (s *ArticleStore) DeleteAllSubscriptions(ctx context.Context, userDID string) error { 180 - tx, err := s.db.BeginTx(ctx, nil) 181 - if err != nil { 182 - return err 183 - } 184 - defer tx.Rollback() 185 - 186 - rows, err := tx.QueryContext(ctx, `SELECT feed_url FROM articles.subscriptions WHERE user_did = ?`, userDID) 187 - if err != nil { 188 - return err 189 - } 190 - var feedURLs []string 191 - for rows.Next() { 192 - var u string 193 - if err := rows.Scan(&u); err != nil { 194 - rows.Close() 195 - return err 196 - } 197 - feedURLs = append(feedURLs, u) 198 - } 199 - rows.Close() 200 - 201 - _, err = tx.ExecContext(ctx, `DELETE FROM articles.subscriptions WHERE user_did = ?`, userDID) 202 - if err != nil { 203 - return err 204 - } 205 - 206 - if len(feedURLs) > 0 { 207 - ph := make([]string, len(feedURLs)) 208 - args := make([]any, len(feedURLs)) 209 - for i, u := range feedURLs { 210 - ph[i] = "?" 211 - args[i] = u 212 - } 213 - _, err = tx.ExecContext(ctx, ` 214 - UPDATE articles.feeds SET subscriber_count = MAX(subscriber_count - 1, 0) 215 - WHERE feed_url IN (`+strings.Join(ph, ",")+`) 216 - `, args...) 217 - if err != nil { 218 - return err 219 - } 220 - } 221 - 222 - return tx.Commit() 156 + _, err := s.db.ExecContext(ctx, `DELETE FROM articles.subscriptions WHERE user_did = ?`, userDID) 157 + return err 223 158 } 224 159 225 160 func (s *ArticleStore) GetSubscriptionByURI(ctx context.Context, userDID, uri string) (*Subscription, error) { ··· 293 228 return count, err 294 229 } 295 230 231 + func (s *ArticleStore) RecountSubscriberCounts(ctx context.Context) error { 232 + _, err := s.db.ExecContext(ctx, ` 233 + UPDATE articles.feeds SET subscriber_count = ( 234 + SELECT COUNT(*) FROM articles.subscriptions sub WHERE sub.feed_url = articles.feeds.feed_url 235 + ) 236 + `) 237 + return err 238 + } 239 + 296 240 func (s *ArticleStore) GetCategories(ctx context.Context, userDID string) ([]string, error) { 297 241 rows, err := s.db.QueryContext(ctx, ` 298 242 SELECT DISTINCT category FROM articles.subscriptions ··· 449 393 } 450 394 defer updateStmt.Close() 451 395 452 - incrStmt, err := tx.PrepareContext(ctx, `UPDATE articles.feeds SET subscriber_count = subscriber_count + 1 WHERE feed_url = ?`) 453 - if err != nil { 454 - return err 455 - } 456 - defer incrStmt.Close() 457 - 458 396 for _, sub := range subs { 459 397 if existingURI, ok := existing[sub.FeedURL]; ok { 460 398 if existingURI == "" && sub.URI != "" { ··· 468 406 } 469 407 continue 470 408 } 471 - result, err := insertStmt.ExecContext(ctx, userDID, sub.FeedURL, nilIfEmpty(sub.Title), sub.Category, nilIfEmpty(sub.URI), nilIfEmpty(sub.CID)) 472 - if err != nil { 409 + if _, err := insertStmt.ExecContext(ctx, userDID, sub.FeedURL, nilIfEmpty(sub.Title), sub.Category, nilIfEmpty(sub.URI), nilIfEmpty(sub.CID)); err != nil { 473 410 return err 474 - } 475 - n, _ := result.RowsAffected() 476 - if n > 0 { 477 - if _, err := incrStmt.ExecContext(ctx, sub.FeedURL); err != nil { 478 - return err 479 - } 480 411 } 481 412 } 482 413 return tx.Commit() ··· 541 472 542 473 for _, feedURL := range toDelete { 543 474 if _, err := tx.ExecContext(ctx, `DELETE FROM articles.subscriptions WHERE user_did = ? AND feed_url = ?`, userDID, feedURL); err != nil { 544 - return err 545 - } 546 - if _, err := tx.ExecContext(ctx, `UPDATE articles.feeds SET subscriber_count = MAX(subscriber_count - 1, 0) WHERE feed_url = ?`, feedURL); err != nil { 547 475 return err 548 476 } 549 477 }