A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add backfill support for missing PDS records

+147 -1
+34 -1
internal/atproto/sync.go
··· 125 125 if err := s.articles.BatchReconcileSubscriptions(ctx, userDID, subs); err != nil { 126 126 return err 127 127 } 128 - return s.articles.DeleteOrphanedSubscriptions(ctx, userDID, activeFeedURLs) 128 + if err := s.articles.DeleteOrphanedSubscriptions(ctx, userDID, activeFeedURLs); err != nil { 129 + return err 130 + } 131 + return s.backfillMissingPDSRecords(ctx, userDID) 129 132 } 130 133 131 134 func (s *Sync) syncLikes(ctx context.Context, userDID string) error { ··· 275 278 276 279 return s.users.SyncFollows(ctx, userDID, activeFollows) 277 280 } 281 + 282 + func (s *Sync) backfillMissingPDSRecords(ctx context.Context, userDID string) error { 283 + subs, err := s.articles.ListSubscriptionsWithoutURI(ctx, userDID) 284 + if err != nil { 285 + return fmt.Errorf("list subscriptions without URI: %w", err) 286 + } 287 + if len(subs) == 0 { 288 + return nil 289 + } 290 + 291 + for _, sub := range subs { 292 + record := SubscriptionRecord{ 293 + CreatedAt: time.Now().Format(time.RFC3339), 294 + FeedURL: sub.FeedURL, 295 + Title: sub.Title, 296 + Category: sub.Category, 297 + } 298 + uri, cid, err := s.client.CreateRecord(ctx, userDID, CollectionSubscription, record) 299 + if err != nil { 300 + s.logger.Error("failed to backfill PDS record", "error", err, "url", sub.FeedURL) 301 + continue 302 + } 303 + if err := s.articles.UpdateSubscriptionURI(ctx, userDID, sub.FeedURL, uri, cid); err != nil { 304 + s.logger.Error("failed to backfill subscription URI", "error", err, "url", sub.FeedURL) 305 + continue 306 + } 307 + s.logger.Info("backfilled PDS record", "url", sub.FeedURL, "uri", uri) 308 + } 309 + return nil 310 + }
+86
internal/db/batch_test.go
··· 378 378 assert.Equal(t, len(list), 0) 379 379 } 380 380 381 + func TestListSubscriptionsWithoutURI_ReturnsOnlyLocal(t *testing.T) { 382 + ctx := context.Background() 383 + dbs := setupTestDB(t) 384 + userDID := seedSubscriptionData(t, ctx, dbs) 385 + 386 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml"}) 387 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://b.com/feed.xml"}) 388 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://c.com/feed.xml"}) 389 + 390 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "A", "cat", "", "")) 391 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, userDID, "https://b.com/feed.xml", "B", "", "at://uri-b", "cid")) 392 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, userDID, "https://c.com/feed.xml", "C", "other", "", "")) 393 + 394 + subs, err := dbs.Articles.ListSubscriptionsWithoutURI(ctx, userDID) 395 + assert.NilError(t, err) 396 + assert.Equal(t, len(subs), 2) 397 + 398 + urls := map[string]bool{} 399 + for _, s := range subs { 400 + urls[s.FeedURL] = true 401 + } 402 + assert.Assert(t, urls["https://a.com/feed.xml"]) 403 + assert.Assert(t, urls["https://c.com/feed.xml"]) 404 + 405 + var aSub, cSub SubData 406 + for _, s := range subs { 407 + if s.FeedURL == "https://a.com/feed.xml" { 408 + aSub = s 409 + } 410 + if s.FeedURL == "https://c.com/feed.xml" { 411 + cSub = s 412 + } 413 + } 414 + assert.Equal(t, aSub.Title, "A") 415 + assert.Equal(t, aSub.Category, "cat") 416 + assert.Equal(t, cSub.Title, "C") 417 + assert.Equal(t, cSub.Category, "other") 418 + } 419 + 420 + func TestListSubscriptionsWithoutURI_EmptyWhenAllHaveURI(t *testing.T) { 421 + ctx := context.Background() 422 + dbs := setupTestDB(t) 423 + userDID := seedSubscriptionData(t, ctx, dbs) 424 + 425 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml"}) 426 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "A", "", "at://uri", "cid")) 427 + 428 + subs, err := dbs.Articles.ListSubscriptionsWithoutURI(ctx, userDID) 429 + assert.NilError(t, err) 430 + assert.Equal(t, len(subs), 0) 431 + } 432 + 433 + func TestUpdateSubscriptionURI_UpdatesSubscription(t *testing.T) { 434 + ctx := context.Background() 435 + dbs := setupTestDB(t) 436 + userDID := seedSubscriptionData(t, ctx, dbs) 437 + 438 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml"}) 439 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "A", "cat", "", "")) 440 + 441 + assert.NilError(t, dbs.Articles.UpdateSubscriptionURI(ctx, userDID, "https://a.com/feed.xml", "at://new-uri", "new-cid")) 442 + 443 + s, err := dbs.Articles.GetSubscription(ctx, userDID, "https://a.com/feed.xml") 444 + assert.NilError(t, err) 445 + assert.Equal(t, s.URI.String, "at://new-uri") 446 + assert.Equal(t, s.CID.String, "new-cid") 447 + assert.Equal(t, s.FeedTitle, "A") 448 + assert.Equal(t, s.Category.String, "cat") 449 + } 450 + 451 + func TestUpdateSubscriptionURI_NoOverwriteIfExists(t *testing.T) { 452 + ctx := context.Background() 453 + dbs := setupTestDB(t) 454 + userDID := seedSubscriptionData(t, ctx, dbs) 455 + 456 + _ = dbs.Articles.UpsertFeed(ctx, &Feed{FeedURL: "https://a.com/feed.xml"}) 457 + assert.NilError(t, dbs.Articles.CreateSubscription(ctx, userDID, "https://a.com/feed.xml", "A", "", "at://original", "cid1")) 458 + 459 + assert.NilError(t, dbs.Articles.UpdateSubscriptionURI(ctx, userDID, "https://a.com/feed.xml", "at://new-uri", "cid2")) 460 + 461 + s, err := dbs.Articles.GetSubscription(ctx, userDID, "https://a.com/feed.xml") 462 + assert.NilError(t, err) 463 + assert.Equal(t, s.URI.String, "at://new-uri") 464 + assert.Equal(t, s.CID.String, "cid2") 465 + } 466 + 381 467 func TestDeleteOrphanedLikes_RemovesOrphaned(t *testing.T) { 382 468 ctx := context.Background() 383 469 dbs := setupTestDB(t)
+27
internal/db/feed.go
··· 482 482 return tx.Commit() 483 483 } 484 484 485 + func (s *ArticleStore) ListSubscriptionsWithoutURI(ctx context.Context, userDID string) ([]SubData, error) { 486 + rows, err := s.db.QueryContext(ctx, 487 + `SELECT feed_url, COALESCE(title, ''), COALESCE(category, '') FROM articles.subscriptions WHERE user_did = ? AND (uri IS NULL OR uri = '')`, 488 + userDID) 489 + if err != nil { 490 + return nil, err 491 + } 492 + defer rows.Close() 493 + 494 + var subs []SubData 495 + for rows.Next() { 496 + var sub SubData 497 + if err := rows.Scan(&sub.FeedURL, &sub.Title, &sub.Category); err != nil { 498 + return nil, err 499 + } 500 + subs = append(subs, sub) 501 + } 502 + return subs, rows.Err() 503 + } 504 + 505 + func (s *ArticleStore) UpdateSubscriptionURI(ctx context.Context, userDID, feedURL, uri, cid string) error { 506 + _, err := s.db.ExecContext(ctx, 507 + `UPDATE articles.subscriptions SET uri = ?, cid = ? WHERE user_did = ? AND feed_url = ?`, 508 + nilIfEmpty(uri), nilIfEmpty(cid), userDID, feedURL) 509 + return err 510 + } 511 + 485 512 func (s *ArticleStore) DeleteOrphanedSubscriptions(ctx context.Context, userDID string, activeFeedURLs map[string]bool) error { 486 513 rows, err := s.db.QueryContext(ctx, 487 514 `SELECT feed_url FROM articles.subscriptions WHERE user_did = ? AND uri IS NOT NULL AND uri != ''`,