A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Split databases by domain

+871 -343
+1 -2
.gitignore
··· 23 23 .env 24 24 25 25 # database 26 - *.db 27 - *.db-* 26 + *.db* 28 27 29 28 # tailwind 30 29 static/output.css
+24 -11
docs/specs.md
··· 13 13 | Layer | Technology | 14 14 | ---------------- | --------------------------------------------------------------- | 15 15 | Backend | Go | 16 - | Database | SQLite (via `mattn/go-sqlite3`) | 16 + | Database | SQLite (3 files: users, articles, recs via `mattn/go-sqlite3`) | 17 17 | Frontend | htmx + TailwindCSS | 18 18 | Auth | AT Protocol OAuth / DID resolution (configurable PLC directory) | 19 19 | AT Protocol role | AppView for `at.glean.*` lexicons | ··· 475 475 476 476 ## 6. Database Schema (SQLite) 477 477 478 - ### 6.1 Users 478 + Glean uses three separate SQLite database files to reduce write-lock contention. Each is opened with its own connection pool: 479 + 480 + | File | Contents | ATTACH alias | 481 + |------|----------|-------------| 482 + | `<base>_users` | Users, follows, OAuth | `main` (primary) | 483 + | `<base>_articles` | Feeds, subscriptions, articles, read state, likes, annotations | `articles` | 484 + | `<base>_recs` | Similarity scores, impressions, dismissals, signal weights | `recs` | 485 + 486 + The users connection pool uses a custom SQLite driver with a `ConnectHook` that ATTACHes the articles and recs databases on every new connection. This allows the cluster engine to run cross-database queries using schema prefixes (`articles.subscriptions`, `recs.user_similarity`, `main.follows`). 487 + 488 + The articles and recs connections are independent pools used by the service layer for single-database operations. 489 + 490 + ### 6.1 Users (`<base>_users`) 479 491 480 492 ```sql 481 493 CREATE TABLE users ( ··· 488 500 ); 489 501 ``` 490 502 491 - ### 6.2 Feed Subscriptions 503 + ### 6.2 Feed Subscriptions (`<base>_articles`) 492 504 493 505 Indexed from `at.glean.subscription` records on user PDS. 494 506 ··· 509 521 CREATE INDEX idx_subscriptions_user ON subscriptions(user_did); 510 522 ``` 511 523 512 - ### 6.3 Feeds 524 + ### 6.3 Feeds (`<base>_articles`) 513 525 514 526 Master list of all known RSS feeds. 515 527 ··· 533 545 ); 534 546 ``` 535 547 536 - ### 6.4 Articles 548 + ### 6.4 Articles (`<base>_articles`) 537 549 538 550 Fetched from RSS feeds. Only fetched for feeds that have local subscribers. 539 551 ··· 557 569 CREATE INDEX idx_articles_published ON articles(published DESC); 558 570 ``` 559 571 560 - ### 6.5 Read State 572 + ### 6.5 Read State (`<base>_articles`) 561 573 562 574 ```sql 563 575 CREATE TABLE read_state ( ··· 571 583 CREATE INDEX idx_read_state_unread ON read_state(user_did, is_read) WHERE is_read = 0; 572 584 ``` 573 585 574 - ### 6.6 Annotations, Likes 586 + ### 6.6 Annotations, Likes (`<base>_articles`) 575 587 576 588 Local mirror of AT Protocol lexicon records for fast querying. 577 589 ··· 602 614 ); 603 615 ``` 604 616 605 - ### 6.7 Cluster Precomputation 617 + ### 6.7 Cluster Precomputation (`<base>_recs`) 606 618 607 619 Stores precomputed similarity data to avoid recalculating on every request. 608 620 ··· 627 639 ); 628 640 ``` 629 641 630 - ### 6.8 Follows 642 + ### 6.8 Follows (`<base>_users`) 631 643 632 644 Tracks follow relationships between users (from `app.bsky.graph.follow` and `sh.tangled.graph.follow` records). 633 645 ··· 645 657 CREATE INDEX idx_follows_target ON follows(target_did); 646 658 ``` 647 659 648 - ### 6.9 OAuth Storage 660 + ### 6.9 OAuth Storage (`<base>_users`) 649 661 650 662 ```sql 651 663 CREATE TABLE oauth_auth_requests ( ··· 899 911 │ │ ├── sync.go # PDS record reconciliation 900 912 │ │ └── xrpc.go # XRPC query handlers (AppView endpoints) 901 913 │ ├── db/ 902 - │ │ ├── db.go # SQLite connection, migrations 914 + │ │ ├── db.go # SQLite connection, single-DB schema 915 + │ │ ├── multi.go # Multi-DB setup with ATTACH for cross-database queries 903 916 │ │ ├── user.go # User queries 904 917 │ │ ├── feed.go # Feed + subscription queries 905 918 │ │ ├── article.go # Article queries
+24 -30
internal/atproto/stream_handler.go
··· 12 12 ) 13 13 14 14 type StreamDBHandler struct { 15 - db *db.DB 16 - logger *slog.Logger 15 + articles *db.DB 16 + users *db.DB 17 + logger *slog.Logger 17 18 } 18 19 19 - func NewStreamDBHandler(database *db.DB, logger *slog.Logger) *StreamDBHandler { 20 - return &StreamDBHandler{db: database, logger: logger} 20 + func NewStreamDBHandler(articles, users *db.DB, logger *slog.Logger) *StreamDBHandler { 21 + return &StreamDBHandler{articles: articles, users: users, logger: logger} 21 22 } 22 23 23 24 func (h *StreamDBHandler) Handle(ctx context.Context, event *Event) error { ··· 49 50 return nil 50 51 } 51 52 52 - existing, err := h.db.GetSubscription(ctx, event.DID, rec.FeedURL) 53 + existing, err := h.articles.GetSubscription(ctx, event.DID, rec.FeedURL) 53 54 if err == nil && existing != nil { 54 55 if !existing.URI.Valid || existing.URI.String == "" { 55 - return h.db.UpdateSubscriptionURI(ctx, event.DID, rec.FeedURL, event.URI, event.CID) 56 + return h.articles.UpdateSubscriptionURI(ctx, event.DID, rec.FeedURL, event.URI, event.CID) 56 57 } 57 58 return nil 58 59 } 59 60 60 61 f := &db.Feed{FeedURL: rec.FeedURL, Title: db.NullStr(rec.Title)} 61 - _ = h.db.UpsertFeed(ctx, f) 62 - err = h.db.CreateSubscription(ctx, event.DID, rec.FeedURL, rec.Title, rec.Category, event.URI, event.CID) 62 + _ = h.articles.UpsertFeed(ctx, f) 63 + err = h.articles.CreateSubscription(ctx, event.DID, rec.FeedURL, rec.Title, rec.Category, event.URI, event.CID) 63 64 if errors.Is(err, db.ErrDuplicateSubscription) { 64 65 return nil 65 66 } ··· 70 71 if !ok { 71 72 return nil 72 73 } 73 - sub, err := h.db.GetSubscriptionByURI(ctx, event.DID, event.URI) 74 + sub, err := h.articles.GetSubscriptionByURI(ctx, event.DID, event.URI) 74 75 if err == nil && sub != nil { 75 - return h.db.DeleteSubscription(ctx, event.DID, sub.FeedURL) 76 + return h.articles.DeleteSubscription(ctx, event.DID, sub.FeedURL) 76 77 } 77 78 _ = parsed 78 79 } ··· 90 91 return nil 91 92 } 92 93 93 - exists, err := h.db.HasLiked(ctx, event.DID, rec.FeedURL, rec.ArticleURL) 94 + exists, err := h.articles.HasLiked(ctx, event.DID, rec.FeedURL, rec.ArticleURL) 94 95 if err != nil || exists { 95 96 return nil 96 97 } 97 98 98 99 t, _ := time.Parse(time.RFC3339, rec.CreatedAt) 99 - err = h.db.CreateLike(ctx, &db.Like{ 100 + err = h.articles.CreateLike(ctx, &db.Like{ 100 101 URI: event.URI, 101 102 AuthorDID: event.DID, 102 103 FeedURL: rec.FeedURL, ··· 110 111 return err 111 112 112 113 case "delete": 113 - return h.db.DeleteLike(ctx, event.URI) 114 + return h.articles.DeleteLike(ctx, event.URI) 114 115 } 115 116 return nil 116 117 } ··· 141 142 if rec.Rating > 0 { 142 143 a.Rating = sql.NullInt64{Int64: int64(rec.Rating), Valid: true} 143 144 } 144 - return h.db.CreateAnnotation(ctx, a) 145 + return h.articles.CreateAnnotation(ctx, a) 145 146 146 147 case "delete": 147 - return h.db.DeleteAnnotation(ctx, event.URI) 148 + return h.articles.DeleteAnnotation(ctx, event.URI) 148 149 } 149 150 return nil 150 151 } ··· 159 160 if rec.Subject == "" { 160 161 return nil 161 162 } 162 - return h.db.UpsertFollow(ctx, event.DID, rec.Subject, event.URI, event.CID) 163 + return h.users.UpsertFollow(ctx, event.DID, rec.Subject, event.URI, event.CID) 163 164 164 165 case "delete": 165 - return h.db.DeleteFollowByURI(ctx, event.URI) 166 + return h.users.DeleteFollowByURI(ctx, event.URI) 166 167 } 167 168 return nil 168 169 } ··· 194 195 CreatedAt: sql.NullTime{Time: t, Valid: true}, 195 196 CID: sql.NullString{String: event.CID, Valid: event.CID != ""}, 196 197 } 197 - return h.db.CreateAnnotation(ctx, a) 198 + return h.articles.CreateAnnotation(ctx, a) 198 199 199 200 case "delete": 200 - // TODO: I actually don't think we should delete an annotation on Glean if deleted from Margin 201 - // return h.db.DeleteAnnotation(ctx, event.URI) 202 201 } 203 202 return nil 204 203 } ··· 214 213 return nil 215 214 } 216 215 217 - existing, err := h.db.GetSubscription(ctx, event.DID, rec.FeedURL) 216 + existing, err := h.articles.GetSubscription(ctx, event.DID, rec.FeedURL) 218 217 if err == nil && existing != nil { 219 218 if !existing.URI.Valid || existing.URI.String == "" { 220 - return h.db.UpdateSubscriptionURI(ctx, event.DID, rec.FeedURL, event.URI, event.CID) 219 + return h.articles.UpdateSubscriptionURI(ctx, event.DID, rec.FeedURL, event.URI, event.CID) 221 220 } 222 221 return nil 223 222 } ··· 227 226 Title: db.NullStr(rec.Title), 228 227 SiteURL: db.NullStr(rec.SiteURL), 229 228 } 230 - _ = h.db.UpsertFeed(ctx, f) 231 - err = h.db.CreateSubscription(ctx, event.DID, rec.FeedURL, rec.Title, rec.Category, event.URI, event.CID) 229 + _ = h.articles.UpsertFeed(ctx, f) 230 + err = h.articles.CreateSubscription(ctx, event.DID, rec.FeedURL, rec.Title, rec.Category, event.URI, event.CID) 232 231 if errors.Is(err, db.ErrDuplicateSubscription) { 233 232 return nil 234 233 } 235 234 return err 236 235 237 236 case "delete": 238 - // TODO: I actually don't think we should delete an subscription on Glean if deleted from Skyreader 239 - // sub, err := h.db.GetSubscriptionByURI(ctx, event.DID, event.URI) 240 - // if err == nil && sub != nil { 241 - // return h.db.DeleteSubscription(ctx, event.DID, sub.FeedURL) 242 - // } 243 237 } 244 238 return nil 245 239 } 246 240 247 241 func (h *StreamDBHandler) resolveFeedURL(ctx context.Context, articleURL string) string { 248 - article, err := h.db.GetArticleByURL(ctx, articleURL) 242 + article, err := h.articles.GetArticleByURL(ctx, articleURL) 249 243 if err != nil { 250 244 return "" 251 245 }
+23 -22
internal/atproto/sync.go
··· 22 22 ) 23 23 24 24 type Sync struct { 25 - db *db.DB 26 - client *Client 27 - logger *slog.Logger 25 + articles *db.DB 26 + users *db.DB 27 + client *Client 28 + logger *slog.Logger 28 29 } 29 30 30 - func NewSync(database *db.DB, client *Client, logger *slog.Logger) *Sync { 31 - return &Sync{db: database, client: client, logger: logger} 31 + func NewSync(articles, users *db.DB, client *Client, logger *slog.Logger) *Sync { 32 + return &Sync{articles: articles, users: users, client: client, logger: logger} 32 33 } 33 34 34 35 func (s *Sync) Run(ctx context.Context, userDID string) error { ··· 91 92 } 92 93 93 94 f := &db.Feed{FeedURL: rec.FeedURL, Title: db.NullStr(rec.Title)} 94 - _ = s.db.UpsertFeed(ctx, f) 95 + _ = s.articles.UpsertFeed(ctx, f) 95 96 96 - existing, err := s.db.GetSubscription(ctx, userDID, rec.FeedURL) 97 + existing, err := s.articles.GetSubscription(ctx, userDID, rec.FeedURL) 97 98 if err == nil && existing != nil { 98 99 if !existing.URI.Valid || existing.URI.String == "" { 99 - return s.db.UpdateSubscriptionURI(ctx, userDID, rec.FeedURL, uri, cid) 100 + return s.articles.UpdateSubscriptionURI(ctx, userDID, rec.FeedURL, uri, cid) 100 101 } 101 102 return nil 102 103 } 103 104 104 - err = s.db.CreateSubscription(ctx, userDID, rec.FeedURL, rec.Title, rec.Category, uri, cid) 105 + err = s.articles.CreateSubscription(ctx, userDID, rec.FeedURL, rec.Title, rec.Category, uri, cid) 105 106 if errors.Is(err, db.ErrDuplicateSubscription) { 106 107 return nil 107 108 } ··· 119 120 } 120 121 121 122 f := &db.Feed{FeedURL: rec.FeedURL, Title: db.NullStr(rec.Title), SiteURL: db.NullStr(rec.SiteURL)} 122 - _ = s.db.UpsertFeed(ctx, f) 123 + _ = s.articles.UpsertFeed(ctx, f) 123 124 124 - existing, err := s.db.GetSubscription(ctx, userDID, rec.FeedURL) 125 + existing, err := s.articles.GetSubscription(ctx, userDID, rec.FeedURL) 125 126 if err == nil && existing != nil { 126 127 if !existing.URI.Valid || existing.URI.String == "" { 127 - return s.db.UpdateSubscriptionURI(ctx, userDID, rec.FeedURL, uri, cid) 128 + return s.articles.UpdateSubscriptionURI(ctx, userDID, rec.FeedURL, uri, cid) 128 129 } 129 130 return nil 130 131 } 131 132 132 - err = s.db.CreateSubscription(ctx, userDID, rec.FeedURL, rec.Title, "", uri, cid) 133 + err = s.articles.CreateSubscription(ctx, userDID, rec.FeedURL, rec.Title, "", uri, cid) 133 134 if errors.Is(err, db.ErrDuplicateSubscription) { 134 135 return nil 135 136 } ··· 146 147 return nil 147 148 } 148 149 149 - exists, err := s.db.HasLiked(ctx, userDID, rec.FeedURL, rec.ArticleURL) 150 + exists, err := s.articles.HasLiked(ctx, userDID, rec.FeedURL, rec.ArticleURL) 150 151 if err != nil { 151 152 return err 152 153 } ··· 163 164 CreatedAt: db.NullTime(t), 164 165 CID: db.NullStr(cid), 165 166 } 166 - err = s.db.CreateLike(ctx, like) 167 + err = s.articles.CreateLike(ctx, like) 167 168 if errors.Is(err, db.ErrDuplicateLike) { 168 169 return nil 169 170 } ··· 180 181 return nil 181 182 } 182 183 183 - exists, err := s.db.AnnotationExists(ctx, uri) 184 + exists, err := s.articles.AnnotationExists(ctx, uri) 184 185 if err != nil || exists { 185 186 return err 186 187 } ··· 200 201 if rec.Rating > 0 { 201 202 a.Rating = db.NullInt(int64(rec.Rating)) 202 203 } 203 - return s.db.CreateAnnotation(ctx, a) 204 + return s.articles.CreateAnnotation(ctx, a) 204 205 } 205 206 206 207 func (s *Sync) reconcileMarginNote(ctx context.Context, userDID, uri, cid string, value json.RawMessage) error { ··· 214 215 return nil 215 216 } 216 217 217 - exists, err := s.db.AnnotationExists(ctx, uri) 218 + exists, err := s.articles.AnnotationExists(ctx, uri) 218 219 if err != nil || exists { 219 220 return err 220 221 } 221 222 222 223 feedURL := "" 223 - if article, err := s.db.GetArticleByURL(ctx, articleURL); err == nil { 224 + if article, err := s.articles.GetArticleByURL(ctx, articleURL); err == nil { 224 225 feedURL = article.FeedURL 225 226 } 226 227 ··· 236 237 CreatedAt: db.NullTime(t), 237 238 CID: db.NullStr(cid), 238 239 } 239 - return s.db.CreateAnnotation(ctx, a) 240 + return s.articles.CreateAnnotation(ctx, a) 240 241 } 241 242 242 243 func (s *Sync) syncFollows(ctx context.Context, userDID string) error { ··· 274 275 avatarURL = avatar 275 276 } 276 277 277 - s.db.CreateUser(ctx, rec.Subject, handle, displayName, avatarURL) 278 + s.users.CreateUser(ctx, rec.Subject, handle, displayName, avatarURL) 278 279 } 279 280 280 281 if next == "" || len(records) == 0 { ··· 288 289 return nil 289 290 } 290 291 291 - return s.db.SyncFollows(ctx, userDID, activeFollows) 292 + return s.users.SyncFollows(ctx, userDID, activeFollows) 292 293 }
+8 -8
internal/cluster/dismiss.go
··· 12 12 13 13 func (e *Engine) DismissFeed(ctx context.Context, userDID, feedURL, reason string) error { 14 14 _, err := e.db.ExecContext(ctx, ` 15 - INSERT INTO dismissed_recommendations (user_did, target_type, target_id, reason) 15 + INSERT INTO recs.dismissed_recommendations (user_did, target_type, target_id, reason) 16 16 VALUES (?, 'feed', ?, ?) 17 17 ON CONFLICT(user_did, target_type, target_id) DO UPDATE SET reason = excluded.reason, dismissed_at = CURRENT_TIMESTAMP 18 18 `, userDID, feedURL, reason) ··· 21 21 22 22 func (e *Engine) DismissArticle(ctx context.Context, userDID, articleURL, reason string) error { 23 23 _, err := e.db.ExecContext(ctx, ` 24 - INSERT INTO dismissed_recommendations (user_did, target_type, target_id, reason) 24 + INSERT INTO recs.dismissed_recommendations (user_did, target_type, target_id, reason) 25 25 VALUES (?, 'article', ?, ?) 26 26 ON CONFLICT(user_did, target_type, target_id) DO UPDATE SET reason = excluded.reason, dismissed_at = CURRENT_TIMESTAMP 27 27 `, userDID, articleURL, reason) ··· 37 37 38 38 for _, imp := range impressions { 39 39 _, err := tx.ExecContext(ctx, ` 40 - INSERT INTO recommendation_impressions (user_did, target_type, target_id, first_shown_at, last_shown_at, shown_count) 40 + INSERT INTO recs.recommendation_impressions (user_did, target_type, target_id, first_shown_at, last_shown_at, shown_count) 41 41 VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1) 42 42 ON CONFLICT(user_did, target_type, target_id) DO UPDATE SET 43 43 last_shown_at = CURRENT_TIMESTAMP, ··· 52 52 53 53 func (e *Engine) MarkImpressionActed(ctx context.Context, userDID, targetType, targetID string) error { 54 54 _, err := e.db.ExecContext(ctx, ` 55 - UPDATE recommendation_impressions SET acted = 1 55 + UPDATE recs.recommendation_impressions SET acted = 1 56 56 WHERE user_did = ? AND target_type = ? AND target_id = ? 57 57 `, userDID, targetType, targetID) 58 58 return err ··· 62 62 cutoff := time.Now().AddDate(0, 0, -maxAgeDays).Format(time.RFC3339) 63 63 64 64 _, err := e.db.ExecContext(ctx, ` 65 - INSERT OR IGNORE INTO dismissed_recommendations (user_did, target_type, target_id, reason, dismissed_at) 65 + INSERT OR IGNORE INTO recs.dismissed_recommendations (user_did, target_type, target_id, reason, dismissed_at) 66 66 SELECT user_did, target_type, target_id, 'auto_stale', CURRENT_TIMESTAMP 67 - FROM recommendation_impressions 67 + FROM recs.recommendation_impressions 68 68 WHERE acted = 0 69 69 AND shown_count >= ? 70 70 AND first_shown_at < ? ··· 75 75 func (e *Engine) IsFeedDismissed(ctx context.Context, userDID, feedURL string) (bool, error) { 76 76 var count int 77 77 err := e.db.QueryRowContext(ctx, ` 78 - SELECT COUNT(1) FROM dismissed_recommendations 78 + SELECT COUNT(1) FROM recs.dismissed_recommendations 79 79 WHERE user_did = ? AND target_type = 'feed' AND target_id = ? 80 80 `, userDID, feedURL).Scan(&count) 81 81 return count > 0, err 82 - } 82 + }
+39 -39
internal/cluster/jaccard.go
··· 42 42 } 43 43 defer func() { _ = tx.Rollback() }() 44 44 45 - if _, err := tx.ExecContext(ctx, `DELETE FROM feed_similarity`); err != nil { 45 + if _, err := tx.ExecContext(ctx, `DELETE FROM recs.feed_similarity`); err != nil { 46 46 return err 47 47 } 48 48 49 49 _, err = tx.ExecContext(ctx, ` 50 - INSERT INTO feed_similarity (feed_a, feed_b, jaccard) 50 + INSERT INTO recs.feed_similarity (feed_a, feed_b, jaccard) 51 51 SELECT 52 52 s1.feed_url, 53 53 s2.feed_url, 54 54 CAST(COUNT(*) AS REAL) / (f1.subscriber_count + f2.subscriber_count - CAST(COUNT(*) AS REAL)) 55 - FROM subscriptions s1 56 - JOIN subscriptions s2 ON s1.user_did = s2.user_did AND s1.feed_url < s2.feed_url 57 - JOIN feeds f1 ON f1.feed_url = s1.feed_url 58 - JOIN feeds f2 ON f2.feed_url = s2.feed_url 55 + FROM articles.subscriptions s1 56 + JOIN articles.subscriptions s2 ON s1.user_did = s2.user_did AND s1.feed_url < s2.feed_url 57 + JOIN articles.feeds f1 ON f1.feed_url = s1.feed_url 58 + JOIN articles.feeds f2 ON f2.feed_url = s2.feed_url 59 59 GROUP BY s1.feed_url, s2.feed_url 60 60 `) 61 61 if err != nil { ··· 82 82 INSERT INTO _feed_words (feed_url, word) 83 83 WITH feed_tokens AS ( 84 84 SELECT feed_url, LOWER(TRIM(value)) AS word 85 - FROM feeds, 85 + FROM articles.feeds, 86 86 json_each('["' || REPLACE(LOWER(COALESCE(description, '')), ' ', '","') || '"]') 87 87 WHERE description IS NOT NULL AND description != '' 88 88 ) ··· 140 140 } 141 141 142 142 descInsert := ` 143 - INSERT OR IGNORE INTO feed_similarity (feed_a, feed_b, jaccard) 143 + INSERT OR IGNORE INTO recs.feed_similarity (feed_a, feed_b, jaccard) 144 144 SELECT feed_a, feed_b, 0 FROM _word_overlap 145 145 ` 146 146 if _, err := tx.ExecContext(ctx, descInsert); err != nil { ··· 148 148 } 149 149 150 150 descUpdate := fmt.Sprintf(` 151 - UPDATE feed_similarity SET 151 + UPDATE recs.feed_similarity SET 152 152 jaccard = jaccard + %g * CAST(_word_overlap.common AS REAL) / NULLIF( 153 - (SELECT cnt FROM _feed_word_counts WHERE feed_url = feed_similarity.feed_a) + 154 - (SELECT cnt FROM _feed_word_counts WHERE feed_url = feed_similarity.feed_b) - 153 + (SELECT cnt FROM _feed_word_counts WHERE feed_url = recs.feed_similarity.feed_a) + 154 + (SELECT cnt FROM _feed_word_counts WHERE feed_url = recs.feed_similarity.feed_b) - 155 155 CAST(_word_overlap.common AS REAL), 156 156 0 157 157 ) 158 158 FROM _word_overlap 159 - WHERE feed_similarity.feed_a = _word_overlap.feed_a 160 - AND feed_similarity.feed_b = _word_overlap.feed_b 159 + WHERE recs.feed_similarity.feed_a = _word_overlap.feed_a 160 + AND recs.feed_similarity.feed_b = _word_overlap.feed_b 161 161 `, e.config.DescriptionWeight) 162 162 163 163 if _, err := tx.ExecContext(ctx, descUpdate); err != nil { ··· 174 174 } 175 175 defer func() { _ = tx.Rollback() }() 176 176 177 - if _, err := tx.ExecContext(ctx, `DELETE FROM user_similarity`); err != nil { 177 + if _, err := tx.ExecContext(ctx, `DELETE FROM recs.user_similarity`); err != nil { 178 178 return err 179 179 } 180 180 181 181 _, err = tx.ExecContext(ctx, ` 182 - INSERT INTO user_similarity (user_a, user_b, jaccard, common_feeds) 182 + INSERT INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds) 183 183 SELECT 184 184 s1.user_did, 185 185 s2.user_did, 186 186 CAST(COUNT(*) AS REAL) / ( 187 - (SELECT COUNT(*) FROM subscriptions WHERE user_did = s1.user_did) + 188 - (SELECT COUNT(*) FROM subscriptions WHERE user_did = s2.user_did) - 187 + (SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = s1.user_did) + 188 + (SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = s2.user_did) - 189 189 CAST(COUNT(*) AS REAL) 190 190 ), 191 191 COUNT(*) 192 - FROM subscriptions s1 193 - JOIN subscriptions s2 ON s1.feed_url = s2.feed_url AND s1.user_did < s2.user_did 192 + FROM articles.subscriptions s1 193 + JOIN articles.subscriptions s2 ON s1.feed_url = s2.feed_url AND s1.user_did < s2.user_did 194 194 GROUP BY s1.user_did, s2.user_did 195 195 `) 196 196 if err != nil { ··· 207 207 } 208 208 if _, err := tx.ExecContext(ctx, ` 209 209 INSERT INTO _likes_count (author_did, cnt) 210 - SELECT author_did, COUNT(*) FROM likes GROUP BY author_did 210 + SELECT author_did, COUNT(*) FROM articles.likes GROUP BY author_did 211 211 `); err != nil { 212 212 return err 213 213 } ··· 227 227 EXP(-0.023 * CAST(julianday('now') - julianday(l1.created_at) AS REAL)) 228 228 * EXP(-0.023 * CAST(julianday('now') - julianday(l2.created_at) AS REAL)) 229 229 ) AS INTEGER) 230 - FROM likes l1 231 - JOIN likes l2 ON l1.feed_url = l2.feed_url AND l1.article_url = l2.article_url 230 + FROM articles.likes l1 231 + JOIN articles.likes l2 ON l1.feed_url = l2.feed_url AND l1.article_url = l2.article_url 232 232 AND l1.author_did < l2.author_did 233 233 WHERE l1.created_at IS NOT NULL AND l2.created_at IS NOT NULL 234 234 GROUP BY l1.author_did, l2.author_did ··· 237 237 } 238 238 239 239 likesUpdate := fmt.Sprintf(` 240 - UPDATE user_similarity SET 240 + UPDATE recs.user_similarity SET 241 241 jaccard = jaccard + %g * CAST(_likes_overlap.common AS REAL) / NULLIF( 242 - (SELECT cnt FROM _likes_count WHERE author_did = user_similarity.user_a) + 243 - (SELECT cnt FROM _likes_count WHERE author_did = user_similarity.user_b) - 242 + (SELECT cnt FROM _likes_count WHERE author_did = recs.user_similarity.user_a) + 243 + (SELECT cnt FROM _likes_count WHERE author_did = recs.user_similarity.user_b) - 244 244 CAST(_likes_overlap.common AS REAL), 245 245 0 246 246 ), 247 247 common_likes = _likes_overlap.common 248 248 FROM _likes_overlap 249 - WHERE user_similarity.user_a = _likes_overlap.user_a 250 - AND user_similarity.user_b = _likes_overlap.user_b 249 + WHERE recs.user_similarity.user_a = _likes_overlap.user_a 250 + AND recs.user_similarity.user_b = _likes_overlap.user_b 251 251 `, e.config.LikesWeight) 252 252 253 253 if _, err := tx.ExecContext(ctx, likesUpdate); err != nil { ··· 255 255 } 256 256 257 257 likesInsert := fmt.Sprintf(` 258 - INSERT INTO user_similarity (user_a, user_b, jaccard, common_feeds, common_likes) 258 + INSERT INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds, common_likes) 259 259 SELECT sub.user_a, sub.user_b, sub.jaccard, 0, sub.common 260 260 FROM ( 261 261 SELECT ··· 289 289 _, err = tx.ExecContext(ctx, ` 290 290 INSERT INTO _tag_overlap (user_a, user_b, common) 291 291 WITH user_tags AS ( 292 - SELECT author_did, TRIM(value) AS tag FROM annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]') 292 + SELECT author_did, TRIM(value) AS tag FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]') 293 293 WHERE tags IS NOT NULL AND tags != '' 294 294 ) 295 295 SELECT t1.author_did, t2.author_did, COUNT(DISTINCT t1.tag) ··· 312 312 if _, err := tx.ExecContext(ctx, ` 313 313 INSERT INTO _tag_count (author_did, cnt) 314 314 WITH user_tags AS ( 315 - SELECT author_did, TRIM(value) AS tag FROM annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]') 315 + SELECT author_did, TRIM(value) AS tag FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]') 316 316 WHERE tags IS NOT NULL AND tags != '' 317 317 ) 318 318 SELECT author_did, COUNT(DISTINCT tag) FROM user_tags GROUP BY author_did ··· 321 321 } 322 322 323 323 _, err = tx.ExecContext(ctx, ` 324 - INSERT OR IGNORE INTO user_similarity (user_a, user_b, jaccard, common_feeds, common_tags) 324 + INSERT OR IGNORE INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds, common_tags) 325 325 SELECT user_a, user_b, 0, 0, 0 FROM _tag_overlap 326 326 `) 327 327 if err != nil { ··· 329 329 } 330 330 331 331 tagsUpdate := fmt.Sprintf(` 332 - UPDATE user_similarity SET 332 + UPDATE recs.user_similarity SET 333 333 jaccard = jaccard + %g * CAST(_tag_overlap.common AS REAL) / NULLIF( 334 - (SELECT cnt FROM _tag_count WHERE author_did = user_similarity.user_a) + 335 - (SELECT cnt FROM _tag_count WHERE author_did = user_similarity.user_b) - 334 + (SELECT cnt FROM _tag_count WHERE author_did = recs.user_similarity.user_a) + 335 + (SELECT cnt FROM _tag_count WHERE author_did = recs.user_similarity.user_b) - 336 336 CAST(_tag_overlap.common AS REAL), 337 337 0 338 338 ), 339 339 common_tags = _tag_overlap.common 340 340 FROM _tag_overlap 341 - WHERE user_similarity.user_a = _tag_overlap.user_a 342 - AND user_similarity.user_b = _tag_overlap.user_b 341 + WHERE recs.user_similarity.user_a = _tag_overlap.user_a 342 + AND recs.user_similarity.user_b = _tag_overlap.user_b 343 343 `, e.config.TagsWeight) 344 344 345 345 if _, err := tx.ExecContext(ctx, tagsUpdate); err != nil { ··· 347 347 } 348 348 349 349 followQuery := fmt.Sprintf(` 350 - INSERT INTO user_similarity (user_a, user_b, jaccard, common_feeds, common_likes, common_tags) 350 + INSERT INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds, common_likes, common_tags) 351 351 SELECT 352 352 MIN(f.user_did, f.target_did), 353 353 MAX(f.user_did, f.target_did), 354 354 %g, 355 355 0, 0, 0 356 - FROM follows f 356 + FROM main.follows f 357 357 WHERE f.user_did != f.target_did 358 358 GROUP BY MIN(f.user_did, f.target_did), MAX(f.user_did, f.target_did) 359 359 ON CONFLICT(user_a, user_b) DO UPDATE SET ··· 366 366 367 367 e.logger.Info("user similarity computed") 368 368 return tx.Commit() 369 - } 369 + }
+49 -50
internal/cluster/scoring.go
··· 41 41 42 42 func (e *Engine) GetFeedRecommendations(ctx context.Context, userDID string, limit int) ([]*FeedRecommendation, error) { 43 43 subCount := 0 44 - _ = e.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM subscriptions WHERE user_did = ?`, userDID).Scan(&subCount) 44 + _ = e.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = ?`, userDID).Scan(&subCount) 45 45 46 46 if subCount < 5 { 47 47 recs, err := e.ColdStartRecommendations(ctx, userDID, limit*2) ··· 91 91 var dbW SignalWeights 92 92 err := e.db.QueryRowContext(ctx, ` 93 93 SELECT w_sub, w_like, w_tag, w_social, w_pop, w_category 94 - FROM user_signal_weights WHERE user_did = ? 94 + FROM recs.user_signal_weights WHERE user_did = ? 95 95 `, userDID).Scan(&dbW.WSub, &dbW.WLike, &dbW.WTag, &dbW.WSocial, &dbW.WPop, &dbW.WCategory) 96 96 if err == nil { 97 97 return dbW ··· 104 104 105 105 rows, err := e.db.QueryContext(ctx, ` 106 106 WITH similar_users AS ( 107 - SELECT user_b AS peer, jaccard FROM user_similarity WHERE user_a = ? AND jaccard > 0.15 107 + SELECT user_b AS peer, jaccard FROM recs.user_similarity WHERE user_a = ? AND jaccard > 0.15 108 108 UNION ALL 109 - SELECT user_a AS peer, jaccard FROM user_similarity WHERE user_b = ? AND jaccard > 0.15 109 + SELECT user_a AS peer, jaccard FROM recs.user_similarity WHERE user_b = ? AND jaccard > 0.15 110 110 ), 111 111 candidate_feeds AS ( 112 112 SELECT s.feed_url, 113 113 SUM(su.jaccard) AS sub_signal 114 114 FROM similar_users su 115 - JOIN subscriptions s ON s.user_did = su.peer 116 - WHERE s.feed_url NOT IN (SELECT feed_url FROM subscriptions WHERE user_did = ?) 117 - AND s.feed_url NOT IN (SELECT target_id FROM dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 115 + JOIN articles.subscriptions s ON s.user_did = su.peer 116 + WHERE s.feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?) 117 + AND s.feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 118 118 GROUP BY s.feed_url 119 119 ), 120 120 like_signals AS ( 121 121 SELECT s.feed_url, 122 122 SUM(su.jaccard * EXP(-0.023 * CAST(julianday('now') - julianday(l.created_at) AS REAL))) AS like_signal 123 123 FROM similar_users su 124 - JOIN likes l ON l.author_did = su.peer 125 - JOIN subscriptions s ON s.feed_url = l.feed_url 126 - WHERE s.feed_url NOT IN (SELECT feed_url FROM subscriptions WHERE user_did = ?) 127 - AND s.feed_url NOT IN (SELECT target_id FROM dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 124 + JOIN articles.likes l ON l.author_did = su.peer 125 + JOIN articles.subscriptions s ON s.feed_url = l.feed_url 126 + WHERE s.feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?) 127 + AND s.feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 128 128 GROUP BY s.feed_url 129 129 ), 130 130 social_boost AS ( 131 131 SELECT s.feed_url, 132 132 SUM(CASE WHEN fd.distance = 1 THEN 1.0 ELSE 0.3 END) AS social 133 - FROM follow_distances fd 134 - JOIN subscriptions s ON s.user_did = fd.user_b 133 + FROM recs.follow_distances fd 134 + JOIN articles.subscriptions s ON s.user_did = fd.user_b 135 135 WHERE fd.user_a = ? 136 - AND s.feed_url NOT IN (SELECT feed_url FROM subscriptions WHERE user_did = ?) 137 - AND s.feed_url NOT IN (SELECT target_id FROM dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 136 + AND s.feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?) 137 + AND s.feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 138 138 GROUP BY s.feed_url 139 139 ), 140 140 category_counts AS ( 141 141 SELECT category, COUNT(*) AS cnt 142 - FROM subscriptions WHERE user_did = ? AND category IS NOT NULL AND category != '' 142 + FROM articles.subscriptions WHERE user_did = ? AND category IS NOT NULL AND category != '' 143 143 GROUP BY category 144 144 ), 145 145 max_subs AS ( 146 - SELECT CAST(COALESCE(MAX(subscriber_count), 1) AS REAL) AS m FROM feeds 146 + SELECT CAST(COALESCE(MAX(subscriber_count), 1) AS REAL) AS m FROM articles.feeds 147 147 ) 148 148 SELECT cf.feed_url, COALESCE(f.title, ''), COALESCE(f.site_url, ''), 149 149 COALESCE(f.description, ''), f.subscriber_count, COALESCE(f.favicon_url, ''), ··· 157 157 ) THEN ? ELSE 0 END 158 158 AS score 159 159 FROM candidate_feeds cf 160 - JOIN feeds f ON f.feed_url = cf.feed_url 160 + JOIN articles.feeds f ON f.feed_url = cf.feed_url 161 161 LEFT JOIN like_signals ls ON ls.feed_url = cf.feed_url 162 162 LEFT JOIN social_boost sb ON sb.feed_url = cf.feed_url 163 163 CROSS JOIN max_subs ms ··· 187 187 188 188 rows, err := e.db.QueryContext(ctx, ` 189 189 WITH similar_users AS ( 190 - SELECT user_b AS peer, jaccard FROM user_similarity WHERE user_a = ? AND jaccard > 0.15 190 + SELECT user_b AS peer, jaccard FROM recs.user_similarity WHERE user_a = ? AND jaccard > 0.15 191 191 UNION ALL 192 - SELECT user_a AS peer, jaccard FROM user_similarity WHERE user_b = ? AND jaccard > 0.15 192 + SELECT user_a AS peer, jaccard FROM recs.user_similarity WHERE user_b = ? AND jaccard > 0.15 193 193 ), 194 194 liked_articles AS ( 195 195 SELECT l.feed_url, l.article_url, 196 196 SUM(su.jaccard * EXP(-0.023 * CAST(julianday('now') - julianday(l.created_at) AS REAL))) AS like_signal 197 197 FROM similar_users su 198 - JOIN likes l ON l.author_did = su.peer 198 + JOIN articles.likes l ON l.author_did = su.peer 199 199 WHERE NOT EXISTS ( 200 - SELECT 1 FROM likes ul WHERE ul.author_did = ? AND ul.feed_url = l.feed_url AND ul.article_url = l.article_url 200 + SELECT 1 FROM articles.likes ul WHERE ul.author_did = ? AND ul.feed_url = l.feed_url AND ul.article_url = l.article_url 201 201 ) 202 202 AND NOT EXISTS ( 203 - SELECT 1 FROM dismissed_recommendations d WHERE d.user_did = ? AND d.target_type = 'article' AND d.target_id = l.article_url 203 + SELECT 1 FROM recs.dismissed_recommendations d WHERE d.user_did = ? AND d.target_type = 'article' AND d.target_id = l.article_url 204 204 ) 205 205 GROUP BY l.feed_url, l.article_url 206 206 ), 207 207 social_likes AS ( 208 208 SELECT l.feed_url, l.article_url, 209 209 SUM(CASE WHEN fd.distance = 1 THEN 1.0 ELSE 0.3 END) AS social 210 - FROM follow_distances fd 211 - JOIN likes l ON l.author_did = fd.user_b 210 + FROM recs.follow_distances fd 211 + JOIN articles.likes l ON l.author_did = fd.user_b 212 212 WHERE fd.user_a = ? 213 213 AND NOT EXISTS ( 214 - SELECT 1 FROM likes ul WHERE ul.author_did = ? AND ul.feed_url = l.feed_url AND ul.article_url = l.article_url 214 + SELECT 1 FROM articles.likes ul WHERE ul.author_did = ? AND ul.feed_url = l.feed_url AND ul.article_url = l.article_url 215 215 ) 216 216 GROUP BY l.feed_url, l.article_url 217 217 ) ··· 223 223 + EXP(-0.023 * CAST(julianday('now') - julianday(a.published) AS REAL)) * 0.2 224 224 AS score 225 225 FROM liked_articles la 226 - JOIN articles a ON a.feed_url = la.feed_url AND a.url = la.article_url 227 - LEFT JOIN feeds f ON f.feed_url = la.feed_url 226 + JOIN articles.articles a ON a.feed_url = la.feed_url AND a.url = la.article_url 227 + LEFT JOIN articles.feeds f ON f.feed_url = la.feed_url 228 228 LEFT JOIN social_likes sl ON sl.feed_url = la.feed_url AND sl.article_url = la.article_url 229 - -- Future-published articles (e.g., scheduled) sort last 230 229 ORDER BY score DESC, (CASE WHEN a.published > 'now' THEN 1 ELSE 0 END), a.published DESC 231 230 LIMIT ? 232 231 `, userDID, userDID, userDID, userDID, userDID, userDID, w.WLike, w.WSocial, limit) ··· 252 251 SELECT u.did, u.handle, COALESCE(u.display_name, ''), COALESCE(u.avatar_url, ''), 253 252 sim.jaccard, sim.common_feeds, COALESCE(sim.common_likes, 0), COALESCE(sim.common_tags, 0) 254 253 FROM ( 255 - SELECT user_b AS peer_did, jaccard, common_feeds, common_likes, common_tags FROM user_similarity WHERE user_a = ? 254 + SELECT user_b AS peer_did, jaccard, common_feeds, common_likes, common_tags FROM recs.user_similarity WHERE user_a = ? 256 255 UNION ALL 257 - SELECT user_a AS peer_did, jaccard, common_feeds, common_likes, common_tags FROM user_similarity WHERE user_b = ? 256 + SELECT user_a AS peer_did, jaccard, common_feeds, common_likes, common_tags FROM recs.user_similarity WHERE user_b = ? 258 257 ) sim 259 - JOIN users u ON u.did = sim.peer_did 258 + JOIN main.users u ON u.did = sim.peer_did 260 259 WHERE u.handle IS NOT NULL AND u.handle != '' 261 - AND EXISTS (SELECT 1 FROM subscriptions s JOIN feeds f ON s.feed_url = f.feed_url WHERE s.user_did = u.did AND f.subscriber_count > 0) 260 + AND EXISTS (SELECT 1 FROM articles.subscriptions s JOIN articles.feeds f ON s.feed_url = f.feed_url WHERE s.user_did = u.did AND f.subscriber_count > 0) 262 261 ORDER BY sim.jaccard DESC 263 262 LIMIT ? 264 263 `, userDID, userDID, limit) ··· 286 285 } 287 286 defer func() { _ = tx.Rollback() }() 288 287 289 - if _, err := tx.ExecContext(ctx, `DELETE FROM user_signal_profiles`); err != nil { 288 + if _, err := tx.ExecContext(ctx, `DELETE FROM recs.user_signal_profiles`); err != nil { 290 289 return err 291 290 } 292 291 ··· 299 298 return err 300 299 } 301 300 if _, err := tx.ExecContext(ctx, ` 302 - INSERT INTO _user_like_counts SELECT author_did, COUNT(*) FROM likes GROUP BY author_did 301 + INSERT INTO _user_like_counts SELECT author_did, COUNT(*) FROM articles.likes GROUP BY author_did 303 302 `); err != nil { 304 303 return err 305 304 } ··· 316 315 INSERT INTO _user_tag_counts 317 316 WITH user_tags AS ( 318 317 SELECT author_did, TRIM(value) AS tag 319 - FROM annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]') 318 + FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]') 320 319 WHERE tags IS NOT NULL AND tags != '' 321 320 ) 322 321 SELECT author_did, COUNT(DISTINCT tag) FROM user_tags GROUP BY author_did ··· 337 336 SELECT user_did, '[' || GROUP_CONCAT('{"c":"' || category || '","n":"' || CAST(cnt AS TEXT) || '}') || ']' 338 337 FROM ( 339 338 SELECT user_did, category, COUNT(*) AS cnt 340 - FROM subscriptions 339 + FROM articles.subscriptions 341 340 WHERE category IS NOT NULL AND category != '' 342 341 GROUP BY user_did, category 343 342 ORDER BY COUNT(*) DESC ··· 349 348 } 350 349 351 350 _, err = tx.ExecContext(ctx, ` 352 - INSERT INTO user_signal_profiles (user_did, total_likes, total_tags, top_categories) 351 + INSERT INTO recs.user_signal_profiles (user_did, total_likes, total_tags, top_categories) 353 352 SELECT 354 353 u.did, 355 354 COALESCE(lc.cnt, 0), 356 355 COALESCE(tc.cnt, 0), 357 356 COALESCE(cc.categories, '[]') 358 - FROM users u 357 + FROM main.users u 359 358 LEFT JOIN _user_like_counts lc ON lc.user_did = u.did 360 359 LEFT JOIN _user_tag_counts tc ON tc.user_did = u.did 361 360 LEFT JOIN _user_top_categories cc ON cc.user_did = u.did ··· 370 369 371 370 func (e *Engine) ColdStartRecommendations(ctx context.Context, userDID string, limit int) ([]*FeedRecommendation, error) { 372 371 subCount := 0 373 - _ = e.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM subscriptions WHERE user_did = ?`, userDID).Scan(&subCount) 372 + _ = e.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = ?`, userDID).Scan(&subCount) 374 373 if subCount >= 5 { 375 374 return nil, nil 376 375 } ··· 378 377 rows, err := e.db.QueryContext(ctx, ` 379 378 WITH followed_feeds AS ( 380 379 SELECT s.feed_url, 1.0 AS weight 381 - FROM follow_distances fd 382 - JOIN subscriptions s ON s.user_did = fd.user_b 380 + FROM recs.follow_distances fd 381 + JOIN articles.subscriptions s ON s.user_did = fd.user_b 383 382 WHERE fd.user_a = ? AND fd.distance = 1 384 - AND s.feed_url NOT IN (SELECT feed_url FROM subscriptions WHERE user_did = ?) 385 - AND s.feed_url NOT IN (SELECT target_id FROM dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 383 + AND s.feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?) 384 + AND s.feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 386 385 ), 387 386 popular_feeds AS ( 388 387 SELECT feed_url, subscriber_count, 389 - LOG(1 + CAST(subscriber_count AS REAL)) / LOG(1 + CAST((SELECT COALESCE(MAX(subscriber_count), 1) FROM feeds) AS REAL)) AS pop_score 390 - FROM feeds 388 + LOG(1 + CAST(subscriber_count AS REAL)) / LOG(1 + CAST((SELECT COALESCE(MAX(subscriber_count), 1) FROM articles.feeds) AS REAL)) AS pop_score 389 + FROM articles.feeds 391 390 WHERE subscriber_count > 0 392 - AND feed_url NOT IN (SELECT feed_url FROM subscriptions WHERE user_did = ?) 393 - AND feed_url NOT IN (SELECT target_id FROM dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 391 + AND feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?) 392 + AND feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed') 394 393 ORDER BY subscriber_count DESC 395 394 LIMIT 50 396 395 ), ··· 410 409 COALESCE(f.favicon_url, ''), 411 410 ac.weight AS score 412 411 FROM all_candidates ac 413 - JOIN feeds f ON f.feed_url = ac.feed_url 412 + JOIN articles.feeds f ON f.feed_url = ac.feed_url 414 413 ORDER BY score DESC 415 414 LIMIT ? 416 415 `, userDID, userDID, userDID, userDID, userDID, limit) ··· 429 428 results = append(results, rec) 430 429 } 431 430 return results, rows.Err() 432 - } 431 + }
+8 -8
internal/cluster/social.go
··· 11 11 } 12 12 defer func() { _ = tx.Rollback() }() 13 13 14 - if _, err := tx.ExecContext(ctx, `DELETE FROM follow_distances`); err != nil { 14 + if _, err := tx.ExecContext(ctx, `DELETE FROM recs.follow_distances`); err != nil { 15 15 return err 16 16 } 17 17 18 18 _, err = tx.ExecContext(ctx, ` 19 - INSERT INTO follow_distances (user_a, user_b, distance) 19 + INSERT INTO recs.follow_distances (user_a, user_b, distance) 20 20 SELECT user_a, user_b, MIN(distance) FROM ( 21 - SELECT user_did AS user_a, target_did AS user_b, 1 AS distance FROM follows WHERE user_did != target_did 21 + SELECT user_did AS user_a, target_did AS user_b, 1 AS distance FROM main.follows WHERE user_did != target_did 22 22 UNION ALL 23 23 SELECT f1.user_did, f2.target_did, 2 24 - FROM follows f1 25 - JOIN follows f2 ON f1.target_did = f2.user_did 24 + FROM main.follows f1 25 + JOIN main.follows f2 ON f1.target_did = f2.user_did 26 26 WHERE f1.user_did != f2.target_did 27 27 ) GROUP BY user_a, user_b 28 28 `) ··· 37 37 func (e *Engine) ComputeFollowDistancesIncremental(ctx context.Context) error { 38 38 var maxFollowed string 39 39 err := e.db.QueryRowContext(ctx, ` 40 - SELECT COALESCE(MAX(followed_at), '1970-01-01') FROM follows 40 + SELECT COALESCE(MAX(followed_at), '1970-01-01') FROM main.follows 41 41 `).Scan(&maxFollowed) 42 42 if err != nil { 43 43 return err ··· 45 45 46 46 var lastComputed string 47 47 err = e.db.QueryRowContext(ctx, ` 48 - SELECT COALESCE(MAX(computed_at), '1970-01-01') FROM user_similarity 48 + SELECT COALESCE(MAX(computed_at), '1970-01-01') FROM recs.user_similarity 49 49 `).Scan(&lastComputed) 50 50 if err != nil { 51 51 return err ··· 56 56 } 57 57 58 58 return e.ComputeFollowDistances(ctx) 59 - } 59 + }
+5 -5
internal/cluster/weights.go
··· 22 22 func (e *Engine) adjustWeight(ctx context.Context, userDID string, signal string, delta float64) { 23 23 var actedCount int 24 24 _ = e.db.QueryRowContext(ctx, ` 25 - SELECT COUNT(*) FROM recommendation_impressions WHERE user_did = ? AND acted = 1 25 + SELECT COUNT(*) FROM recs.recommendation_impressions WHERE user_did = ? AND acted = 1 26 26 `, userDID).Scan(&actedCount) 27 27 if actedCount < minActionsTune { 28 28 return 29 29 } 30 30 31 31 var exists int 32 - _ = e.db.QueryRowContext(ctx, `SELECT 1 FROM user_signal_weights WHERE user_did = ?`, userDID).Scan(&exists) 32 + _ = e.db.QueryRowContext(ctx, `SELECT 1 FROM recs.user_signal_weights WHERE user_did = ?`, userDID).Scan(&exists) 33 33 34 34 if exists == 0 { 35 35 _, _ = e.db.ExecContext(ctx, ` 36 - INSERT INTO user_signal_weights (user_did, w_sub, w_like, w_tag, w_social, w_pop, w_category) 36 + INSERT INTO recs.user_signal_weights (user_did, w_sub, w_like, w_tag, w_social, w_pop, w_category) 37 37 VALUES (?, 1.0, 0.5, 0.3, 0.7, 0.2, 0.4) 38 38 `, userDID) 39 39 } ··· 45 45 46 46 adj := learningRate * delta 47 47 _, _ = e.db.ExecContext(ctx, ` 48 - UPDATE user_signal_weights SET 48 + UPDATE recs.user_signal_weights SET 49 49 `+column+` = MAX(?, MIN(?, `+column+` * (1 + ?))), 50 50 updated_at = CURRENT_TIMESTAMP 51 51 WHERE user_did = ? ··· 90 90 } 91 91 } 92 92 return best 93 - } 93 + }
+43 -46
internal/db/db.go
··· 9 9 "github.com/mattn/go-sqlite3" 10 10 ) 11 11 12 + const DSN = "_journal_mode=WAL&_busy_timeout=5000&_synchronous=NORMAL&_cache=shared" 13 + 12 14 func init() { 13 15 sql.Register("sqlite3_glean", &sqlite3.SQLiteDriver{ 14 16 ConnectHook: func(conn *sqlite3.SQLiteConn) error { ··· 61 63 } 62 64 63 65 func Open(path string) (*DB, error) { 64 - db, err := sql.Open("sqlite3_glean", path+"?_journal_mode=WAL&_busy_timeout=30000&_synchronous=NORMAL&_cache_size=-64000&_stmt_cache_size=64&_mutex=no") 66 + db, err := sql.Open("sqlite3_glean", path+"?cache=shared&"+DSN) 65 67 if err != nil { 66 68 return nil, err 67 69 } ··· 70 72 db.SetMaxIdleConns(5) 71 73 db.SetConnMaxLifetime(30 * time.Minute) 72 74 73 - if err := initSchema(db); err != nil { 75 + wrapped := &DB{db} 76 + if err := initSchema(wrapped); err != nil { 74 77 db.Close() 75 78 return nil, err 76 79 } 77 80 78 - return &DB{db}, nil 81 + return wrapped, nil 79 82 } 80 83 81 - func initSchema(db *sql.DB) error { 84 + func initSchema(db *DB) error { 82 85 for _, s := range schema { 83 86 if _, err := db.Exec(s); err != nil { 84 87 return err ··· 115 118 )`, 116 119 `CREATE TABLE IF NOT EXISTS subscriptions ( 117 120 id INTEGER PRIMARY KEY AUTOINCREMENT, 118 - user_did TEXT NOT NULL REFERENCES users(did), 119 - feed_url TEXT NOT NULL REFERENCES feeds(feed_url), 121 + user_did TEXT NOT NULL, 122 + feed_url TEXT NOT NULL, 120 123 title TEXT, 121 124 category TEXT, 122 125 added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, ··· 126 129 )`, 127 130 `CREATE TABLE IF NOT EXISTS articles ( 128 131 id INTEGER PRIMARY KEY AUTOINCREMENT, 129 - feed_url TEXT NOT NULL REFERENCES feeds(feed_url), 132 + feed_url TEXT NOT NULL, 130 133 guid TEXT NOT NULL, 131 134 title TEXT NOT NULL DEFAULT '', 132 135 url TEXT, ··· 140 143 UNIQUE(feed_url, guid) 141 144 )`, 142 145 `CREATE TABLE IF NOT EXISTS read_state ( 143 - user_did TEXT NOT NULL REFERENCES users(did), 144 - article_id INTEGER NOT NULL REFERENCES articles(id), 146 + user_did TEXT NOT NULL, 147 + article_id INTEGER NOT NULL, 145 148 is_read BOOLEAN NOT NULL DEFAULT 0, 146 149 read_at DATETIME, 147 150 PRIMARY KEY (user_did, article_id) ··· 149 152 `CREATE TABLE IF NOT EXISTS annotations ( 150 153 id INTEGER PRIMARY KEY AUTOINCREMENT, 151 154 uri TEXT NOT NULL UNIQUE, 152 - author_did TEXT NOT NULL REFERENCES users(did), 155 + author_did TEXT NOT NULL, 153 156 feed_url TEXT NOT NULL, 154 157 article_url TEXT NOT NULL, 155 158 quote TEXT, ··· 162 165 `CREATE TABLE IF NOT EXISTS likes ( 163 166 id INTEGER PRIMARY KEY AUTOINCREMENT, 164 167 uri TEXT NOT NULL UNIQUE, 165 - author_did TEXT NOT NULL REFERENCES users(did), 168 + author_did TEXT NOT NULL, 166 169 feed_url TEXT NOT NULL, 167 170 article_url TEXT NOT NULL, 168 171 created_at DATETIME NOT NULL, ··· 170 173 UNIQUE(author_did, feed_url, article_url) 171 174 )`, 172 175 `CREATE TABLE IF NOT EXISTS feed_similarity ( 173 - feed_a TEXT NOT NULL REFERENCES feeds(feed_url), 174 - feed_b TEXT NOT NULL REFERENCES feeds(feed_url), 176 + feed_a TEXT NOT NULL, 177 + feed_b TEXT NOT NULL, 175 178 jaccard REAL NOT NULL, 176 179 computed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 177 180 PRIMARY KEY (feed_a, feed_b), 178 181 CHECK(feed_a < feed_b) 179 182 )`, 180 183 `CREATE TABLE IF NOT EXISTS user_similarity ( 181 - user_a TEXT NOT NULL REFERENCES users(did), 182 - user_b TEXT NOT NULL REFERENCES users(did), 184 + user_a TEXT NOT NULL, 185 + user_b TEXT NOT NULL, 183 186 jaccard REAL NOT NULL, 184 187 common_feeds INTEGER NOT NULL, 185 188 common_likes INTEGER NOT NULL DEFAULT 0, ··· 189 192 CHECK(user_a < user_b) 190 193 )`, 191 194 `CREATE TABLE IF NOT EXISTS follows ( 192 - user_did TEXT NOT NULL REFERENCES users(did), 195 + user_did TEXT NOT NULL, 193 196 target_did TEXT NOT NULL, 194 197 uri TEXT, 195 198 cid TEXT, ··· 206 209 data TEXT NOT NULL, 207 210 PRIMARY KEY (account_did, session_id) 208 211 )`, 209 - `CREATE INDEX IF NOT EXISTS idx_subscriptions_feed ON subscriptions(feed_url)`, 210 - `CREATE INDEX IF NOT EXISTS idx_subscriptions_feed_user ON subscriptions(feed_url, user_did)`, 211 - `CREATE INDEX IF NOT EXISTS idx_subscriptions_user ON subscriptions(user_did)`, 212 - `CREATE INDEX IF NOT EXISTS idx_subscriptions_uri ON subscriptions(uri)`, 213 - `CREATE INDEX IF NOT EXISTS idx_likes_author_feed ON likes(author_did, feed_url, created_at)`, 214 - `CREATE INDEX IF NOT EXISTS idx_articles_feed ON articles(feed_url)`, 215 - `CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published DESC)`, 216 - `CREATE INDEX IF NOT EXISTS idx_articles_url ON articles(url)`, 217 - `CREATE INDEX IF NOT EXISTS idx_read_state_unread ON read_state(user_did, is_read) WHERE is_read = 0`, 218 - `CREATE INDEX IF NOT EXISTS idx_annotations_article ON annotations(article_url)`, 219 - `CREATE INDEX IF NOT EXISTS idx_annotations_author ON annotations(author_did)`, 220 - `CREATE INDEX IF NOT EXISTS idx_annotations_created_at ON annotations(created_at DESC)`, 221 - `CREATE INDEX IF NOT EXISTS idx_likes_article ON likes(feed_url, article_url)`, 222 - `CREATE INDEX IF NOT EXISTS idx_likes_author ON likes(author_did)`, 223 - `CREATE INDEX IF NOT EXISTS idx_likes_created_at ON likes(created_at DESC)`, 224 - `CREATE INDEX IF NOT EXISTS idx_follows_user ON follows(user_did)`, 225 - `CREATE INDEX IF NOT EXISTS idx_follows_target ON follows(target_did)`, 226 - `CREATE INDEX IF NOT EXISTS idx_follows_uri ON follows(uri)`, 227 - `CREATE INDEX IF NOT EXISTS idx_user_similarity_b ON user_similarity(user_b)`, 228 - `CREATE INDEX IF NOT EXISTS idx_user_similarity_a ON user_similarity(user_a)`, 229 - 230 212 `CREATE TABLE IF NOT EXISTS dismissed_recommendations ( 231 - user_did TEXT NOT NULL REFERENCES users(did), 213 + user_did TEXT NOT NULL, 232 214 target_type TEXT NOT NULL CHECK(target_type IN ('feed', 'article')), 233 215 target_id TEXT NOT NULL, 234 216 reason TEXT, 235 217 dismissed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 236 218 PRIMARY KEY (user_did, target_type, target_id) 237 219 )`, 238 - 239 220 `CREATE TABLE IF NOT EXISTS recommendation_impressions ( 240 - user_did TEXT NOT NULL REFERENCES users(did), 221 + user_did TEXT NOT NULL, 241 222 target_type TEXT NOT NULL CHECK(target_type IN ('feed', 'article')), 242 223 target_id TEXT NOT NULL, 243 224 first_shown_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, ··· 246 227 acted BOOLEAN NOT NULL DEFAULT 0, 247 228 PRIMARY KEY (user_did, target_type, target_id) 248 229 )`, 249 - 250 230 `CREATE TABLE IF NOT EXISTS follow_distances ( 251 231 user_a TEXT NOT NULL, 252 232 user_b TEXT NOT NULL, 253 233 distance INTEGER NOT NULL CHECK(distance IN (1, 2)), 254 234 PRIMARY KEY (user_a, user_b) 255 235 )`, 256 - 257 236 `CREATE TABLE IF NOT EXISTS user_signal_weights ( 258 - user_did TEXT PRIMARY KEY REFERENCES users(did), 237 + user_did TEXT PRIMARY KEY, 259 238 w_sub REAL NOT NULL DEFAULT 1.0, 260 239 w_like REAL NOT NULL DEFAULT 0.5, 261 240 w_tag REAL NOT NULL DEFAULT 0.3, ··· 264 243 w_category REAL NOT NULL DEFAULT 0.4, 265 244 updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP 266 245 )`, 267 - 268 246 `CREATE TABLE IF NOT EXISTS user_signal_profiles ( 269 - user_did TEXT PRIMARY KEY REFERENCES users(did), 247 + user_did TEXT PRIMARY KEY, 270 248 total_likes INTEGER NOT NULL DEFAULT 0, 271 249 total_tags INTEGER NOT NULL DEFAULT 0, 272 250 top_categories TEXT, 273 251 updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP 274 252 )`, 275 - 253 + `CREATE INDEX IF NOT EXISTS idx_subscriptions_feed ON subscriptions(feed_url)`, 254 + `CREATE INDEX IF NOT EXISTS idx_subscriptions_feed_user ON subscriptions(feed_url, user_did)`, 255 + `CREATE INDEX IF NOT EXISTS idx_subscriptions_user ON subscriptions(user_did)`, 256 + `CREATE INDEX IF NOT EXISTS idx_subscriptions_uri ON subscriptions(uri)`, 257 + `CREATE INDEX IF NOT EXISTS idx_likes_author_feed ON likes(author_did, feed_url, created_at)`, 258 + `CREATE INDEX IF NOT EXISTS idx_articles_feed ON articles(feed_url)`, 259 + `CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published DESC)`, 260 + `CREATE INDEX IF NOT EXISTS idx_articles_url ON articles(url)`, 261 + `CREATE INDEX IF NOT EXISTS idx_read_state_unread ON read_state(user_did, is_read) WHERE is_read = 0`, 262 + `CREATE INDEX IF NOT EXISTS idx_annotations_article ON annotations(article_url)`, 263 + `CREATE INDEX IF NOT EXISTS idx_annotations_author ON annotations(author_did)`, 264 + `CREATE INDEX IF NOT EXISTS idx_annotations_created_at ON annotations(created_at DESC)`, 265 + `CREATE INDEX IF NOT EXISTS idx_likes_article ON likes(feed_url, article_url)`, 266 + `CREATE INDEX IF NOT EXISTS idx_likes_author ON likes(author_did)`, 267 + `CREATE INDEX IF NOT EXISTS idx_likes_created_at ON likes(created_at DESC)`, 268 + `CREATE INDEX IF NOT EXISTS idx_follows_user ON follows(user_did)`, 269 + `CREATE INDEX IF NOT EXISTS idx_follows_target ON follows(target_did)`, 270 + `CREATE INDEX IF NOT EXISTS idx_follows_uri ON follows(uri)`, 271 + `CREATE INDEX IF NOT EXISTS idx_user_similarity_b ON user_similarity(user_b)`, 272 + `CREATE INDEX IF NOT EXISTS idx_user_similarity_a ON user_similarity(user_a)`, 276 273 `CREATE INDEX IF NOT EXISTS idx_dismissed_user_type ON dismissed_recommendations(user_did, target_type)`, 277 274 `CREATE INDEX IF NOT EXISTS idx_impressions_user_unacted ON recommendation_impressions(user_did, acted, shown_count)`, 278 275 `CREATE INDEX IF NOT EXISTS idx_impressions_last_shown ON recommendation_impressions(last_shown_at)`,
+367
internal/db/multi.go
··· 1 + package db 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + "math" 7 + "sync/atomic" 8 + "time" 9 + 10 + "github.com/mattn/go-sqlite3" 11 + ) 12 + 13 + type Databases struct { 14 + Users *DB 15 + Articles *DB 16 + Recs *DB 17 + } 18 + 19 + var multiDriverSeq int64 20 + 21 + func OpenAll(basePath string) (*Databases, error) { 22 + articlesPath := basePath + "_articles" 23 + recsPath := basePath + "_recs" 24 + 25 + seq := atomic.AddInt64(&multiDriverSeq, 1) 26 + driverName := fmt.Sprintf("sqlite3_glean_multi_%d", seq) 27 + 28 + sql.Register(driverName, &sqlite3.SQLiteDriver{ 29 + ConnectHook: func(conn *sqlite3.SQLiteConn) error { 30 + if err := conn.RegisterFunc("exp", func(x float64) float64 { return math.Exp(x) }, true); err != nil { 31 + return err 32 + } 33 + if err := conn.RegisterFunc("log", func(x float64) float64 { return math.Log(x) }, true); err != nil { 34 + return err 35 + } 36 + for _, p := range []string{ 37 + `PRAGMA wal_autocheckpoint = 1000`, 38 + `PRAGMA temp_store = MEMORY`, 39 + `PRAGMA mmap_size = 268435456`, 40 + } { 41 + if _, err := conn.Exec(p, nil); err != nil { 42 + return err 43 + } 44 + } 45 + if _, err := conn.Exec(fmt.Sprintf("ATTACH DATABASE '%s' AS articles", articlesPath), nil); err != nil { 46 + return err 47 + } 48 + if _, err := conn.Exec(fmt.Sprintf("ATTACH DATABASE '%s' AS recs", recsPath), nil); err != nil { 49 + return err 50 + } 51 + return nil 52 + }, 53 + }) 54 + 55 + usersDB, err := sql.Open(driverName, basePath+"_users?cache=shared&"+DSN) 56 + if err != nil { 57 + return nil, err 58 + } 59 + usersDB.SetMaxOpenConns(10) 60 + usersDB.SetMaxIdleConns(5) 61 + usersDB.SetConnMaxLifetime(30 * time.Minute) 62 + users := &DB{usersDB} 63 + 64 + articles, err := Open(articlesPath) 65 + if err != nil { 66 + users.Close() 67 + return nil, err 68 + } 69 + 70 + recs, err := Open(recsPath) 71 + if err != nil { 72 + users.Close() 73 + articles.Close() 74 + return nil, err 75 + } 76 + 77 + if err := initUsersSchema(users); err != nil { 78 + users.Close() 79 + articles.Close() 80 + recs.Close() 81 + return nil, err 82 + } 83 + 84 + if err := initArticlesSchema(articles); err != nil { 85 + users.Close() 86 + articles.Close() 87 + recs.Close() 88 + return nil, err 89 + } 90 + 91 + if err := initRecsSchema(recs); err != nil { 92 + users.Close() 93 + articles.Close() 94 + recs.Close() 95 + return nil, err 96 + } 97 + 98 + return &Databases{ 99 + Users: users, 100 + Articles: articles, 101 + Recs: recs, 102 + }, nil 103 + } 104 + 105 + func (d *Databases) Close() error { 106 + if d.Users != nil { 107 + _ = d.Users.Close() 108 + } 109 + if d.Articles != nil { 110 + _ = d.Articles.Close() 111 + } 112 + if d.Recs != nil { 113 + _ = d.Recs.Close() 114 + } 115 + return nil 116 + } 117 + 118 + func initUsersSchema(db *DB) error { 119 + for _, s := range usersSchema { 120 + if _, err := db.Exec(s); err != nil { 121 + return err 122 + } 123 + } 124 + return nil 125 + } 126 + 127 + func initArticlesSchema(db *DB) error { 128 + for _, s := range articlesSchema { 129 + if _, err := db.Exec(s); err != nil { 130 + return err 131 + } 132 + } 133 + return nil 134 + } 135 + 136 + func initRecsSchema(db *DB) error { 137 + for _, s := range recsSchema { 138 + if _, err := db.Exec(s); err != nil { 139 + return err 140 + } 141 + } 142 + return nil 143 + } 144 + 145 + var usersSchema = []string{ 146 + `CREATE TABLE IF NOT EXISTS users ( 147 + did TEXT PRIMARY KEY, 148 + handle TEXT NOT NULL, 149 + display_name TEXT, 150 + avatar_url TEXT, 151 + indexed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 152 + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP 153 + )`, 154 + 155 + `CREATE TABLE IF NOT EXISTS follows ( 156 + user_did TEXT NOT NULL, 157 + target_did TEXT NOT NULL, 158 + uri TEXT, 159 + cid TEXT, 160 + followed_at DATETIME, 161 + PRIMARY KEY (user_did, target_did) 162 + )`, 163 + 164 + `CREATE TABLE IF NOT EXISTS oauth_auth_requests ( 165 + state TEXT PRIMARY KEY, 166 + data TEXT NOT NULL 167 + )`, 168 + 169 + `CREATE TABLE IF NOT EXISTS oauth_sessions ( 170 + account_did TEXT NOT NULL, 171 + session_id TEXT NOT NULL, 172 + data TEXT NOT NULL, 173 + PRIMARY KEY (account_did, session_id) 174 + )`, 175 + 176 + `CREATE INDEX IF NOT EXISTS idx_follows_user ON follows(user_did)`, 177 + `CREATE INDEX IF NOT EXISTS idx_follows_target ON follows(target_did)`, 178 + `CREATE INDEX IF NOT EXISTS idx_follows_uri ON follows(uri)`, 179 + `CREATE INDEX IF NOT EXISTS idx_follows_followed_at ON follows(followed_at)`, 180 + `CREATE INDEX IF NOT EXISTS idx_users_handle ON users(handle)`, 181 + } 182 + 183 + var articlesSchema = []string{ 184 + `CREATE TABLE IF NOT EXISTS feeds ( 185 + feed_url TEXT PRIMARY KEY, 186 + title TEXT, 187 + site_url TEXT, 188 + description TEXT, 189 + feed_type TEXT CHECK(feed_type IN ('rss', 'atom', 'json')), 190 + last_fetched_at DATETIME, 191 + last_error TEXT, 192 + subscriber_count INTEGER NOT NULL DEFAULT 0, 193 + etag TEXT, 194 + last_modified TEXT, 195 + fetch_interval_minutes INTEGER NOT NULL DEFAULT 30, 196 + next_fetch_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 197 + consecutive_empty_fetches INTEGER NOT NULL DEFAULT 0, 198 + error_count INTEGER NOT NULL DEFAULT 0, 199 + favicon_url TEXT 200 + )`, 201 + 202 + `CREATE TABLE IF NOT EXISTS subscriptions ( 203 + id INTEGER PRIMARY KEY AUTOINCREMENT, 204 + user_did TEXT NOT NULL, 205 + feed_url TEXT NOT NULL, 206 + title TEXT, 207 + category TEXT, 208 + added_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 209 + uri TEXT, 210 + cid TEXT, 211 + UNIQUE(user_did, feed_url) 212 + )`, 213 + 214 + `CREATE TABLE IF NOT EXISTS articles ( 215 + id INTEGER PRIMARY KEY AUTOINCREMENT, 216 + feed_url TEXT NOT NULL, 217 + guid TEXT NOT NULL, 218 + title TEXT NOT NULL DEFAULT '', 219 + url TEXT, 220 + author TEXT, 221 + summary TEXT, 222 + content TEXT, 223 + full_content TEXT, 224 + published DATETIME, 225 + updated DATETIME, 226 + fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 227 + UNIQUE(feed_url, guid) 228 + )`, 229 + 230 + `CREATE TABLE IF NOT EXISTS read_state ( 231 + user_did TEXT NOT NULL, 232 + article_id INTEGER NOT NULL, 233 + is_read BOOLEAN NOT NULL DEFAULT 0, 234 + read_at DATETIME, 235 + PRIMARY KEY (user_did, article_id) 236 + )`, 237 + 238 + `CREATE TABLE IF NOT EXISTS annotations ( 239 + id INTEGER PRIMARY KEY AUTOINCREMENT, 240 + uri TEXT NOT NULL UNIQUE, 241 + author_did TEXT NOT NULL, 242 + feed_url TEXT NOT NULL, 243 + article_url TEXT NOT NULL, 244 + quote TEXT, 245 + note TEXT, 246 + tags TEXT, 247 + rating INTEGER, 248 + created_at DATETIME NOT NULL, 249 + cid TEXT 250 + )`, 251 + 252 + `CREATE TABLE IF NOT EXISTS likes ( 253 + id INTEGER PRIMARY KEY AUTOINCREMENT, 254 + uri TEXT NOT NULL UNIQUE, 255 + author_did TEXT NOT NULL, 256 + feed_url TEXT NOT NULL, 257 + article_url TEXT NOT NULL, 258 + created_at DATETIME NOT NULL, 259 + cid TEXT, 260 + UNIQUE(author_did, feed_url, article_url) 261 + )`, 262 + 263 + `CREATE INDEX IF NOT EXISTS idx_subscriptions_feed ON subscriptions(feed_url)`, 264 + `CREATE INDEX IF NOT EXISTS idx_subscriptions_feed_user ON subscriptions(feed_url, user_did)`, 265 + `CREATE INDEX IF NOT EXISTS idx_subscriptions_user ON subscriptions(user_did)`, 266 + `CREATE INDEX IF NOT EXISTS idx_subscriptions_uri ON subscriptions(uri)`, 267 + `CREATE INDEX IF NOT EXISTS idx_likes_author_feed ON likes(author_did, feed_url, created_at)`, 268 + `CREATE INDEX IF NOT EXISTS idx_articles_feed ON articles(feed_url)`, 269 + `CREATE INDEX IF NOT EXISTS idx_articles_published ON articles(published DESC)`, 270 + `CREATE INDEX IF NOT EXISTS idx_articles_url ON articles(url)`, 271 + `CREATE INDEX IF NOT EXISTS idx_read_state_unread ON read_state(user_did, is_read) WHERE is_read = 0`, 272 + `CREATE INDEX IF NOT EXISTS idx_annotations_article ON annotations(article_url)`, 273 + `CREATE INDEX IF NOT EXISTS idx_annotations_author ON annotations(author_did)`, 274 + `CREATE INDEX IF NOT EXISTS idx_annotations_created_at ON annotations(created_at DESC)`, 275 + `CREATE INDEX IF NOT EXISTS idx_likes_article ON likes(feed_url, article_url)`, 276 + `CREATE INDEX IF NOT EXISTS idx_likes_author ON likes(author_did)`, 277 + `CREATE INDEX IF NOT EXISTS idx_likes_created_at ON likes(created_at DESC)`, 278 + 279 + `CREATE VIRTUAL TABLE IF NOT EXISTS articles_fts USING fts5(title, summary, content, author, content=articles, content_rowid=id)`, 280 + `CREATE TRIGGER IF NOT EXISTS articles_ai AFTER INSERT ON articles BEGIN 281 + INSERT INTO articles_fts(rowid, title, summary, content, author) VALUES (new.id, new.title, new.summary, new.content, new.author); 282 + END`, 283 + `CREATE TRIGGER IF NOT EXISTS articles_ad AFTER DELETE ON articles BEGIN 284 + INSERT INTO articles_fts(articles_fts, rowid, title, summary, content, author) VALUES('delete', old.id, old.title, old.summary, old.content, old.author); 285 + END`, 286 + `CREATE TRIGGER IF NOT EXISTS articles_au AFTER UPDATE ON articles BEGIN 287 + INSERT INTO articles_fts(articles_fts, rowid, title, summary, content, author) VALUES('delete', old.id, old.title, old.summary, old.content, old.author); 288 + INSERT INTO articles_fts(rowid, title, summary, content, author) VALUES (new.id, new.title, new.summary, new.content, new.author); 289 + END`, 290 + } 291 + 292 + var recsSchema = []string{ 293 + `CREATE TABLE IF NOT EXISTS feed_similarity ( 294 + feed_a TEXT NOT NULL, 295 + feed_b TEXT NOT NULL, 296 + jaccard REAL NOT NULL, 297 + computed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 298 + PRIMARY KEY (feed_a, feed_b), 299 + CHECK(feed_a < feed_b) 300 + )`, 301 + 302 + `CREATE TABLE IF NOT EXISTS user_similarity ( 303 + user_a TEXT NOT NULL, 304 + user_b TEXT NOT NULL, 305 + jaccard REAL NOT NULL, 306 + common_feeds INTEGER NOT NULL, 307 + common_likes INTEGER NOT NULL DEFAULT 0, 308 + common_tags INTEGER NOT NULL DEFAULT 0, 309 + computed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 310 + PRIMARY KEY (user_a, user_b), 311 + CHECK(user_a < user_b) 312 + )`, 313 + 314 + `CREATE TABLE IF NOT EXISTS dismissed_recommendations ( 315 + user_did TEXT NOT NULL, 316 + target_type TEXT NOT NULL CHECK(target_type IN ('feed', 'article')), 317 + target_id TEXT NOT NULL, 318 + reason TEXT, 319 + dismissed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 320 + PRIMARY KEY (user_did, target_type, target_id) 321 + )`, 322 + 323 + `CREATE TABLE IF NOT EXISTS recommendation_impressions ( 324 + user_did TEXT NOT NULL, 325 + target_type TEXT NOT NULL CHECK(target_type IN ('feed', 'article')), 326 + target_id TEXT NOT NULL, 327 + first_shown_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 328 + last_shown_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 329 + shown_count INTEGER NOT NULL DEFAULT 1, 330 + acted BOOLEAN NOT NULL DEFAULT 0, 331 + PRIMARY KEY (user_did, target_type, target_id) 332 + )`, 333 + 334 + `CREATE TABLE IF NOT EXISTS follow_distances ( 335 + user_a TEXT NOT NULL, 336 + user_b TEXT NOT NULL, 337 + distance INTEGER NOT NULL CHECK(distance IN (1, 2)), 338 + PRIMARY KEY (user_a, user_b) 339 + )`, 340 + 341 + `CREATE TABLE IF NOT EXISTS user_signal_weights ( 342 + user_did TEXT PRIMARY KEY, 343 + w_sub REAL NOT NULL DEFAULT 1.0, 344 + w_like REAL NOT NULL DEFAULT 0.5, 345 + w_tag REAL NOT NULL DEFAULT 0.3, 346 + w_social REAL NOT NULL DEFAULT 0.7, 347 + w_pop REAL NOT NULL DEFAULT 0.2, 348 + w_category REAL NOT NULL DEFAULT 0.4, 349 + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP 350 + )`, 351 + 352 + `CREATE TABLE IF NOT EXISTS user_signal_profiles ( 353 + user_did TEXT PRIMARY KEY, 354 + total_likes INTEGER NOT NULL DEFAULT 0, 355 + total_tags INTEGER NOT NULL DEFAULT 0, 356 + top_categories TEXT, 357 + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP 358 + )`, 359 + 360 + `CREATE INDEX IF NOT EXISTS idx_dismissed_user_type ON dismissed_recommendations(user_did, target_type)`, 361 + `CREATE INDEX IF NOT EXISTS idx_impressions_user_unacted ON recommendation_impressions(user_did, acted, shown_count)`, 362 + `CREATE INDEX IF NOT EXISTS idx_impressions_last_shown ON recommendation_impressions(last_shown_at)`, 363 + `CREATE INDEX IF NOT EXISTS idx_follow_distances_b ON follow_distances(user_b)`, 364 + `CREATE INDEX IF NOT EXISTS idx_follow_distances_a_dist ON follow_distances(user_a, distance)`, 365 + `CREATE INDEX IF NOT EXISTS idx_user_similarity_b ON user_similarity(user_b)`, 366 + `CREATE INDEX IF NOT EXISTS idx_user_similarity_a ON user_similarity(user_a)`, 367 + }
+3
internal/feed/fetcher.go
··· 101 101 ticker := time.NewTicker(s.tickInterval) 102 102 defer ticker.Stop() 103 103 104 + // fetch all at startup 105 + s.fetchAll(ctx) 106 + 104 107 for { 105 108 select { 106 109 case <-ctx.Done():
+20 -11
internal/server/annotations_handler.go
··· 16 16 17 17 func (s *Server) handleLibrary(w http.ResponseWriter, r *http.Request) { 18 18 user := currentUser(r) 19 + ctx := r.Context() 19 20 20 21 limit := 20 21 22 22 - likedPageNum, _ := strconv.Atoi(r.URL.Query().Get("liked_page")) 23 - if likedPageNum < 1 { 23 + likedPageNum, err := strconv.Atoi(r.URL.Query().Get("liked_page")) 24 + if err != nil || likedPageNum < 1 { 24 25 likedPageNum = 1 25 26 } 26 - annotPageNum, _ := strconv.Atoi(r.URL.Query().Get("annot_page")) 27 - if annotPageNum < 1 { 27 + annotPageNum, err := strconv.Atoi(r.URL.Query().Get("annot_page")) 28 + if err != nil || annotPageNum < 1 { 28 29 annotPageNum = 1 29 30 } 30 31 31 32 likedPage := Pagination{Page: likedPageNum, PageSize: limit} 32 33 annotPage := Pagination{Page: annotPageNum, PageSize: limit} 33 34 34 - articles, _ := s.db.ListLikedArticles(r.Context(), user.DID, limit+1, likedPage.Offset()) 35 + articles, err := s.dbs.Articles.ListLikedArticles(ctx, user.DID, limit+1, likedPage.Offset()) 36 + if err != nil { 37 + s.logger.Warn("failed to list liked articles", "error", err, "did", user.DID) 38 + } 35 39 likedHasMore := len(articles) > limit 36 40 if likedHasMore { 37 41 articles = articles[:limit] ··· 42 46 likedPage.NextPage = likedPage.Page + 1 43 47 } 44 48 45 - annotations, _ := s.db.ListAnnotations(r.Context(), "", "", user.DID, limit+1, annotPage.Offset()) 49 + annotations, err := s.dbs.Articles.ListAnnotations(ctx, "", "", user.DID, limit+1, annotPage.Offset()) 50 + if err != nil { 51 + s.logger.Warn("failed to list annotations", "error", err, "did", user.DID) 52 + } 46 53 annotHasMore := len(annotations) > limit 47 54 if annotHasMore { 48 55 annotations = annotations[:limit] ··· 65 72 66 73 func (s *Server) handleCreateAnnotation(w http.ResponseWriter, r *http.Request) { 67 74 user := currentUser(r) 75 + ctx := r.Context() 68 76 a := &db.Annotation{ 69 77 AuthorDID: user.DID, 70 78 FeedURL: r.FormValue("feed_url"), ··· 95 103 Tags: tags, 96 104 Rating: int(a.Rating.Int64), 97 105 } 98 - uri, cid, err := client.CreateRecord(r.Context(), user.DID, atproto.CollectionAnnotation, record) 106 + uri, cid, err := client.CreateRecord(ctx, user.DID, atproto.CollectionAnnotation, record) 99 107 if err != nil { 100 108 s.logger.Error("failed to write annotation to PDS", "error", err) 101 109 http.Error(w, "failed to write annotation to PDS: "+err.Error(), http.StatusBadGateway) ··· 107 115 a.URI = fmt.Sprintf("glean:annotation:%d", time.Now().UnixNano()) 108 116 } 109 117 110 - if err := s.db.CreateAnnotation(r.Context(), a); err != nil { 118 + if err := s.dbs.Articles.CreateAnnotation(ctx, a); err != nil { 111 119 http.Error(w, err.Error(), http.StatusInternalServerError) 112 120 return 113 121 } ··· 121 129 122 130 func (s *Server) handleDeleteAnnotation(w http.ResponseWriter, r *http.Request) { 123 131 user := currentUser(r) 132 + ctx := r.Context() 124 133 id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) 125 134 if err != nil { 126 135 http.Error(w, "invalid id", http.StatusBadRequest) 127 136 return 128 137 } 129 138 130 - annotation, err := s.db.GetAnnotation(r.Context(), id) 139 + annotation, err := s.dbs.Articles.GetAnnotation(ctx, id) 131 140 if err != nil { 132 141 http.Error(w, "annotation not found", http.StatusNotFound) 133 142 return ··· 142 151 if client := s.pdsClientForUser(r); client != nil { 143 152 parsed, ok := atproto.ParseRecordURI(annotation.URI) 144 153 if ok { 145 - if delErr := client.DeleteRecord(r.Context(), user.DID, parsed.Collection, parsed.RKey); delErr != nil { 154 + if delErr := client.DeleteRecord(ctx, user.DID, parsed.Collection, parsed.RKey); delErr != nil { 146 155 s.logger.Error("failed to delete annotation from PDS", "error", delErr) 147 156 } 148 157 } 149 158 } 150 159 } 151 160 152 - if err := s.db.DeleteAnnotation(r.Context(), annotation.URI); err != nil { 161 + if err := s.dbs.Articles.DeleteAnnotation(ctx, annotation.URI); err != nil { 153 162 http.Error(w, err.Error(), http.StatusInternalServerError) 154 163 return 155 164 }
+82 -37
internal/server/articles_handler.go
··· 32 32 33 33 func (s *Server) handleArticles(w http.ResponseWriter, r *http.Request) { 34 34 user := currentUser(r) 35 + ctx := r.Context() 35 36 feedURL := r.URL.Query().Get("feed") 36 37 status := r.URL.Query().Get("status") 37 38 searchQuery := r.URL.Query().Get("q") ··· 39 40 page := pageFromRequest(r, 50) 40 41 41 42 if status == "" && searchQuery == "" { 42 - unreadCount, _ := s.db.GetUnreadCount(r.Context(), user.DID, feedURL) 43 + unreadCount, err := s.dbs.Articles.GetUnreadCount(ctx, user.DID, feedURL) 44 + if err != nil { 45 + s.logger.Warn("failed to get unread count", "error", err, "did", user.DID) 46 + } 43 47 if unreadCount > 0 { 44 48 status = "unread" 45 49 } else { ··· 51 55 var err error 52 56 53 57 if searchQuery != "" { 54 - articles, err = s.db.SearchArticles(r.Context(), user.DID, searchQuery, page.Limit()+1, page.Offset()) 58 + articles, err = s.dbs.Articles.SearchArticles(ctx, user.DID, searchQuery, page.Limit()+1, page.Offset()) 55 59 } else { 56 60 switch status { 57 61 case "unread": 58 - articles, err = s.db.ListUnreadArticles(r.Context(), user.DID, feedURL, page.Limit()+1, page.Offset()) 62 + articles, err = s.dbs.Articles.ListUnreadArticles(ctx, user.DID, feedURL, page.Limit()+1, page.Offset()) 59 63 case "read": 60 - articles, err = s.db.ListReadArticles(r.Context(), user.DID, feedURL, page.Limit()+1, page.Offset()) 64 + articles, err = s.dbs.Articles.ListReadArticles(ctx, user.DID, feedURL, page.Limit()+1, page.Offset()) 61 65 default: 62 - articles, err = s.db.ListArticles(r.Context(), user.DID, feedURL, page.Limit()+1, page.Offset()) 66 + articles, err = s.dbs.Articles.ListArticles(ctx, user.DID, feedURL, page.Limit()+1, page.Offset()) 63 67 } 64 68 } 65 69 ··· 88 92 } 89 93 90 94 if feedURL != "" { 91 - if feed, err := s.db.GetFeed(r.Context(), feedURL); err == nil { 95 + if feed, err := s.dbs.Articles.GetFeed(ctx, feedURL); err == nil { 92 96 data["Feed"] = feed 97 + } else { 98 + s.logger.Warn("failed to get feed", "error", err, "feed", feedURL) 93 99 } 94 - if _, err := s.db.GetSubscription(r.Context(), user.DID, feedURL); err == nil { 100 + if _, err := s.dbs.Articles.GetSubscription(ctx, user.DID, feedURL); err == nil { 95 101 data["IsSubscribed"] = true 96 102 } else { 97 103 data["IsSubscribed"] = false ··· 108 114 109 115 func (s *Server) handleNewArticleCount(w http.ResponseWriter, r *http.Request) { 110 116 user := currentUser(r) 117 + ctx := r.Context() 111 118 sinceUnix, err := strconv.ParseInt(r.URL.Query().Get("since"), 10, 64) 112 119 if err != nil { 113 120 w.WriteHeader(http.StatusBadRequest) ··· 115 122 } 116 123 since := time.Unix(sinceUnix, 0) 117 124 118 - count, err := s.db.CountNewArticles(r.Context(), user.DID, since) 125 + count, err := s.dbs.Articles.CountNewArticles(ctx, user.DID, since) 119 126 if err != nil { 127 + s.logger.Error("failed to count new articles", "error", err, "did", user.DID) 120 128 w.WriteHeader(http.StatusInternalServerError) 121 129 return 122 130 } ··· 138 146 139 147 func (s *Server) handleArticleDetail(w http.ResponseWriter, r *http.Request) { 140 148 user := currentUser(r) 149 + ctx := r.Context() 141 150 id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) 142 151 if err != nil { 143 152 http.Error(w, "invalid id", http.StatusBadRequest) 144 153 return 145 154 } 146 155 147 - article, err := s.db.GetArticle(r.Context(), id) 156 + article, err := s.dbs.Articles.GetArticle(ctx, id) 148 157 if err != nil { 149 158 http.Error(w, "article not found", http.StatusNotFound) 150 159 return 151 160 } 152 161 153 - _ = s.db.MarkArticleRead(r.Context(), user.DID, id) 162 + if err := s.dbs.Articles.MarkArticleRead(ctx, user.DID, id); err != nil { 163 + s.logger.Warn("failed to mark article read", "error", err, "id", id) 164 + } 154 165 155 - readState, _ := s.db.GetReadState(r.Context(), user.DID, id) 166 + readState, err := s.dbs.Articles.GetReadState(ctx, user.DID, id) 167 + if err != nil { 168 + s.logger.Warn("failed to get read state", "error", err, "id", id) 169 + } 170 + 171 + var likeCount int 172 + if article.URL.Valid { 173 + likeCount, err = s.dbs.Articles.GetLikeCount(ctx, article.FeedURL, article.URL.String) 174 + if err != nil { 175 + s.logger.Warn("failed to get like count", "error", err, "feed", article.FeedURL) 176 + } 177 + } 156 178 157 - likeCount, _ := s.db.GetLikeCount(r.Context(), article.FeedURL, article.URL.String) 158 179 liked := false 159 180 if article.URL.Valid { 160 - liked, _ = s.db.HasLiked(r.Context(), user.DID, article.FeedURL, article.URL.String) 181 + liked, err = s.dbs.Articles.HasLiked(ctx, user.DID, article.FeedURL, article.URL.String) 182 + if err != nil { 183 + s.logger.Warn("failed to check if liked", "error", err) 184 + } 185 + } 186 + 187 + annotations, err := s.dbs.Articles.ListAnnotations(ctx, "", article.URL.String, "", 20, 0) 188 + if err != nil { 189 + s.logger.Warn("failed to list annotations", "error", err) 190 + } 191 + 192 + feed, err := s.dbs.Articles.GetFeed(ctx, article.FeedURL) 193 + if err != nil { 194 + s.logger.Warn("failed to get feed", "error", err, "feed", article.FeedURL) 161 195 } 162 - annotations, _ := s.db.ListAnnotations(r.Context(), "", article.URL.String, "", 20, 0) 163 - feed, _ := s.db.GetFeed(r.Context(), article.FeedURL) 164 196 165 197 s.render(w, r, "article_detail.html", map[string]any{ 166 198 "User": user, ··· 181 213 http.Error(w, "invalid id", http.StatusBadRequest) 182 214 return 183 215 } 184 - if err := s.db.MarkArticleRead(r.Context(), user.DID, id); err != nil { 216 + if err := s.dbs.Articles.MarkArticleRead(r.Context(), user.DID, id); err != nil { 185 217 http.Error(w, err.Error(), http.StatusInternalServerError) 186 218 return 187 219 } ··· 196 228 http.Error(w, "invalid id", http.StatusBadRequest) 197 229 return 198 230 } 199 - if err := s.db.MarkArticleUnread(r.Context(), user.DID, id); err != nil { 231 + if err := s.dbs.Articles.MarkArticleUnread(r.Context(), user.DID, id); err != nil { 200 232 http.Error(w, err.Error(), http.StatusInternalServerError) 201 233 return 202 234 } ··· 206 238 207 239 func (s *Server) handleLikeArticle(w http.ResponseWriter, r *http.Request) { 208 240 user := currentUser(r) 241 + ctx := r.Context() 209 242 id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) 210 243 if err != nil { 211 244 http.Error(w, "invalid id", http.StatusBadRequest) 212 245 return 213 246 } 214 247 215 - article, err := s.db.GetArticle(r.Context(), id) 248 + article, err := s.dbs.Articles.GetArticle(ctx, id) 216 249 if err != nil { 217 250 http.Error(w, err.Error(), http.StatusNotFound) 218 251 return 219 252 } 220 253 221 - liked, err := s.db.HasLiked(r.Context(), user.DID, article.FeedURL, article.URL.String) 254 + liked, err := s.dbs.Articles.HasLiked(ctx, user.DID, article.FeedURL, article.URL.String) 222 255 if err != nil { 223 256 http.Error(w, err.Error(), http.StatusInternalServerError) 224 257 return 225 258 } 226 259 227 260 if liked { 228 - existingLike, getErr := s.db.GetLike(r.Context(), user.DID, article.FeedURL, article.URL.String) 261 + existingLike, getErr := s.dbs.Articles.GetLike(ctx, user.DID, article.FeedURL, article.URL.String) 229 262 if getErr != nil { 230 263 http.Error(w, getErr.Error(), http.StatusInternalServerError) 231 264 return ··· 234 267 if client := s.pdsClientForUser(r); client != nil { 235 268 parsed, ok := atproto.ParseRecordURI(existingLike.URI) 236 269 if ok { 237 - if delErr := client.DeleteRecord(r.Context(), user.DID, parsed.Collection, parsed.RKey); delErr != nil { 270 + if delErr := client.DeleteRecord(ctx, user.DID, parsed.Collection, parsed.RKey); delErr != nil { 238 271 s.logger.Error("failed to delete like from PDS", "error", delErr) 239 272 http.Error(w, "failed to delete like from PDS: "+delErr.Error(), http.StatusBadGateway) 240 273 return ··· 242 275 } 243 276 } 244 277 } 245 - if err := s.db.DeleteLikeByUserArticle(r.Context(), user.DID, article.FeedURL, article.URL.String); err != nil { 278 + if err := s.dbs.Articles.DeleteLikeByUserArticle(ctx, user.DID, article.FeedURL, article.URL.String); err != nil { 246 279 http.Error(w, err.Error(), http.StatusInternalServerError) 247 280 return 248 281 } ··· 254 287 } 255 288 256 289 if client := s.pdsClientForUser(r); client != nil { 257 - uri, _, err := client.CreateRecord(r.Context(), user.DID, atproto.CollectionLike, likeRecord) 290 + uri, _, err := client.CreateRecord(ctx, user.DID, atproto.CollectionLike, likeRecord) 258 291 if err != nil { 259 292 s.logger.Error("failed to write like to PDS", "error", err) 260 293 http.Error(w, "failed to write like to PDS: "+err.Error(), http.StatusBadGateway) ··· 268 301 ArticleURL: article.URL.String, 269 302 CreatedAt: sql.NullTime{Time: time.Now(), Valid: true}, 270 303 } 271 - if err := s.db.CreateLike(r.Context(), like); err != nil && !errors.Is(err, db.ErrDuplicateLike) { 304 + if err := s.dbs.Articles.CreateLike(ctx, like); err != nil && !errors.Is(err, db.ErrDuplicateLike) { 272 305 http.Error(w, err.Error(), http.StatusInternalServerError) 273 306 return 274 307 } 275 - _ = s.engine.MarkImpressionActed(r.Context(), user.DID, "article", article.URL.String) 276 - sig := s.engine.GetDominantSignal(s.engine.GetWeights(r.Context(), user.DID)) 277 - s.engine.RewardSignal(r.Context(), user.DID, sig) 308 + if err := s.engine.MarkImpressionActed(ctx, user.DID, "article", article.URL.String); err != nil { 309 + s.logger.Warn("failed to mark impression acted", "error", err) 310 + } 311 + sig := s.engine.GetDominantSignal(s.engine.GetWeights(ctx, user.DID)) 312 + s.engine.RewardSignal(ctx, user.DID, sig) 278 313 } else { 279 314 like := &db.Like{ 280 315 URI: fmt.Sprintf("glean:like:%d", time.Now().UnixNano()), ··· 283 318 ArticleURL: article.URL.String, 284 319 CreatedAt: sql.NullTime{Time: time.Now(), Valid: true}, 285 320 } 286 - if err := s.db.CreateLike(r.Context(), like); err != nil && !errors.Is(err, db.ErrDuplicateLike) { 321 + if err := s.dbs.Articles.CreateLike(ctx, like); err != nil && !errors.Is(err, db.ErrDuplicateLike) { 287 322 http.Error(w, err.Error(), http.StatusInternalServerError) 288 323 return 289 324 } 290 - _ = s.engine.MarkImpressionActed(r.Context(), user.DID, "article", article.URL.String) 291 - sig := s.engine.GetDominantSignal(s.engine.GetWeights(r.Context(), user.DID)) 292 - s.engine.RewardSignal(r.Context(), user.DID, sig) 325 + if err := s.engine.MarkImpressionActed(ctx, user.DID, "article", article.URL.String); err != nil { 326 + s.logger.Warn("failed to mark impression acted", "error", err) 327 + } 328 + sig := s.engine.GetDominantSignal(s.engine.GetWeights(ctx, user.DID)) 329 + s.engine.RewardSignal(ctx, user.DID, sig) 293 330 } 294 331 } 295 332 296 - likeCount, _ := s.db.GetLikeCount(r.Context(), article.FeedURL, article.URL.String) 333 + likeCount := 0 334 + if article.URL.Valid { 335 + likeCount, err = s.dbs.Articles.GetLikeCount(ctx, article.FeedURL, article.URL.String) 336 + if err != nil { 337 + s.logger.Warn("failed to get like count", "error", err) 338 + } 339 + } 297 340 bordered := r.URL.Query().Get("bordered") == "true" 298 341 writeLikeButton(w, id, !liked, likeCount, bordered) 299 342 } 300 343 301 344 func (s *Server) handleMarkAllRead(w http.ResponseWriter, r *http.Request) { 302 345 user := currentUser(r) 346 + ctx := r.Context() 303 347 feedURL := r.FormValue("feed") 304 348 var err error 305 349 if feedURL != "" { 306 - err = s.db.MarkAllRead(r.Context(), user.DID, feedURL) 350 + err = s.dbs.Articles.MarkAllRead(ctx, user.DID, feedURL) 307 351 } else { 308 - err = s.db.MarkAllSubscribedRead(r.Context(), user.DID) 352 + err = s.dbs.Articles.MarkAllSubscribedRead(ctx, user.DID) 309 353 } 310 354 if err != nil { 311 355 http.Error(w, err.Error(), http.StatusInternalServerError) ··· 316 360 } 317 361 318 362 func (s *Server) handleFetchContent(w http.ResponseWriter, r *http.Request) { 363 + ctx := r.Context() 319 364 id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) 320 365 if err != nil { 321 366 http.Error(w, "invalid id", http.StatusBadRequest) 322 367 return 323 368 } 324 369 325 - article, err := s.db.GetArticle(r.Context(), id) 370 + article, err := s.dbs.Articles.GetArticle(ctx, id) 326 371 if err != nil { 327 372 http.Error(w, "article not found", http.StatusNotFound) 328 373 return ··· 334 379 return 335 380 } 336 381 337 - content, err := s.scraper.Scrape(r.Context(), article.URL.String) 382 + content, err := s.scraper.Scrape(ctx, article.URL.String) 338 383 if err != nil { 339 384 s.logger.Error("failed to scrape article", "error", err, "url", article.URL.String) 340 385 w.Header().Set("Content-Type", "text/html") ··· 350 395 351 396 cleaned := sanitize.HTML(content) 352 397 353 - if err := s.db.UpdateArticleFullContent(r.Context(), id, cleaned); err != nil { 398 + if err := s.dbs.Articles.UpdateArticleFullContent(ctx, id, cleaned); err != nil { 354 399 s.logger.Error("failed to save full content", "error", err, "id", id) 355 400 } 356 401
+3 -3
internal/server/auth_handler.go
··· 33 33 http.Error(w, "could not resolve handle", http.StatusInternalServerError) 34 34 return 35 35 } 36 - user, createErr := s.db.CreateUser(r.Context(), did, handle, "", "") 36 + user, createErr := s.dbs.Users.CreateUser(r.Context(), did, handle, "", "") 37 37 if createErr != nil { 38 38 http.Error(w, createErr.Error(), http.StatusInternalServerError) 39 39 return ··· 67 67 return 68 68 } 69 69 70 - user, err := s.db.CreateUser(r.Context(), did, handle, "", "") 70 + user, err := s.dbs.Users.CreateUser(r.Context(), did, handle, "", "") 71 71 if err != nil { 72 72 s.logger.Error("failed to create user", "error", err) 73 73 http.Error(w, err.Error(), http.StatusInternalServerError) ··· 102 102 } 103 103 } 104 104 105 - user, err := s.db.CreateUser(r.Context(), did, handle, displayName, avatarURL) 105 + user, err := s.dbs.Users.CreateUser(r.Context(), did, handle, displayName, avatarURL) 106 106 if err != nil { 107 107 s.logger.Error("failed to create user", "error", err) 108 108 http.Error(w, err.Error(), http.StatusInternalServerError)
+41 -9
internal/server/dashboard_handler.go
··· 9 9 10 10 func (s *Server) handleDashboard(w http.ResponseWriter, r *http.Request) { 11 11 user := currentUser(r) 12 + ctx := r.Context() 12 13 13 - unreadCount, _ := s.db.GetUnreadCount(r.Context(), user.DID, "") 14 - subCount, _ := s.db.GetSubscriptionCount(r.Context(), user.DID) 14 + unreadCount, err := s.dbs.Articles.GetUnreadCount(ctx, user.DID, "") 15 + if err != nil { 16 + s.logger.Warn("failed to get unread count", "error", err, "did", user.DID) 17 + } 18 + 19 + subCount, err := s.dbs.Articles.GetSubscriptionCount(ctx, user.DID) 20 + if err != nil { 21 + s.logger.Warn("failed to get subscription count", "error", err, "did", user.DID) 22 + } 15 23 16 24 page := pageFromRequest(r, 25) 17 - articles, _ := s.db.ListUnreadArticles(r.Context(), user.DID, "", page.Limit()+1, page.Offset()) 25 + articles, err := s.dbs.Articles.ListUnreadArticles(ctx, user.DID, "", page.Limit()+1, page.Offset()) 26 + if err != nil { 27 + s.logger.Warn("failed to list unread articles", "error", err, "did", user.DID) 28 + } 18 29 totalFetched := len(articles) 19 30 page = page.Paginate(totalFetched) 20 31 if page.HasNext { 21 32 articles = articles[:page.PageSize] 22 33 } 23 34 24 - articleRecs, _ := s.engine.GetArticleRecommendations(r.Context(), user.DID, 5) 25 - peopleRecs, _ := s.engine.GetPeopleRecommendations(r.Context(), user.DID, 5) 26 - feedRecs, _ := s.engine.GetFeedRecommendations(r.Context(), user.DID, 5) 35 + articleRecs, err := s.engine.GetArticleRecommendations(ctx, user.DID, 5) 36 + if err != nil { 37 + s.logger.Warn("failed to get article recommendations", "error", err, "did", user.DID) 38 + } 39 + 40 + peopleRecs, err := s.engine.GetPeopleRecommendations(ctx, user.DID, 5) 41 + if err != nil { 42 + s.logger.Warn("failed to get people recommendations", "error", err, "did", user.DID) 43 + } 44 + 45 + feedRecs, err := s.engine.GetFeedRecommendations(ctx, user.DID, 5) 46 + if err != nil { 47 + s.logger.Warn("failed to get feed recommendations", "error", err, "did", user.DID) 48 + } 27 49 28 50 var impressions []cluster.Impression 29 51 for _, rec := range feedRecs { ··· 33 55 impressions = append(impressions, cluster.Impression{TargetType: "article", TargetID: rec.URL}) 34 56 } 35 57 if len(impressions) > 0 { 36 - _ = s.engine.RecordImpressions(r.Context(), user.DID, impressions) 58 + if err := s.engine.RecordImpressions(ctx, user.DID, impressions); err != nil { 59 + s.logger.Warn("failed to record impressions", "error", err) 60 + } 37 61 } 38 62 39 63 since := time.Now().AddDate(0, 0, -7).Format(time.RFC3339) 40 - personalTrending, _ := s.db.ListTrendingArticlesForUser(r.Context(), user.DID, since, 5, 0) 41 - globalTrending, _ := s.db.ListTrendingArticles(r.Context(), user.DID, since, 10, 0) 64 + 65 + personalTrending, err := s.dbs.Articles.ListTrendingArticlesForUser(ctx, user.DID, since, 5, 0) 66 + if err != nil { 67 + s.logger.Warn("failed to list personal trending", "error", err, "did", user.DID) 68 + } 69 + 70 + globalTrending, err := s.dbs.Articles.ListTrendingArticles(ctx, user.DID, since, 10, 0) 71 + if err != nil { 72 + s.logger.Warn("failed to list global trending", "error", err, "did", user.DID) 73 + } 42 74 43 75 s.render(w, r, "dashboard.html", map[string]any{ 44 76 "User": user,
+75 -27
internal/server/feeds_handler.go
··· 16 16 func (s *Server) handleFeeds(w http.ResponseWriter, r *http.Request) { 17 17 user := currentUser(r) 18 18 category := r.URL.Query().Get("category") 19 + ctx := r.Context() 19 20 20 21 page := pageFromRequest(r, 50) 21 - subs, _ := s.db.ListSubscriptions(r.Context(), user.DID, category, page.Limit()+1, page.Offset()) 22 + subs, err := s.dbs.Articles.ListSubscriptions(ctx, user.DID, category, page.Limit()+1, page.Offset()) 23 + if err != nil { 24 + s.logger.Warn("failed to list subscriptions", "error", err, "did", user.DID) 25 + } 22 26 totalFetched := len(subs) 23 27 page = page.Paginate(totalFetched) 24 28 if page.HasNext { 25 29 subs = subs[:page.PageSize] 26 30 } 27 31 28 - allSubs, _ := s.db.ListSubscriptions(r.Context(), user.DID, "", 1000, 0) 29 - feedRecs, _ := s.engine.GetFeedRecommendations(r.Context(), user.DID, 6) 30 - peopleRecs, _ := s.engine.GetPeopleRecommendations(r.Context(), user.DID, 5) 32 + allSubs, err := s.dbs.Articles.ListSubscriptions(ctx, user.DID, "", 1000, 0) 33 + if err != nil { 34 + s.logger.Warn("failed to list all subscriptions", "error", err, "did", user.DID) 35 + } 36 + 37 + feedRecs, err := s.engine.GetFeedRecommendations(ctx, user.DID, 6) 38 + if err != nil { 39 + s.logger.Warn("failed to get feed recommendations", "error", err, "did", user.DID) 40 + } 41 + 42 + peopleRecs, err := s.engine.GetPeopleRecommendations(ctx, user.DID, 5) 43 + if err != nil { 44 + s.logger.Warn("failed to get people recommendations", "error", err, "did", user.DID) 45 + } 31 46 32 47 if len(feedRecs) > 0 { 33 48 impressions := make([]cluster.Impression, len(feedRecs)) 34 49 for i, rec := range feedRecs { 35 50 impressions[i] = cluster.Impression{TargetType: "feed", TargetID: rec.FeedURL} 36 51 } 37 - _ = s.engine.RecordImpressions(r.Context(), user.DID, impressions) 52 + if err := s.engine.RecordImpressions(ctx, user.DID, impressions); err != nil { 53 + s.logger.Warn("failed to record impressions", "error", err) 54 + } 38 55 } 39 - deadFeeds, _ := s.db.ListDeadFeeds(r.Context(), user.DID, 7) 40 56 41 - categories, _ := s.db.GetCategories(r.Context(), user.DID) 57 + deadFeeds, err := s.dbs.Articles.ListDeadFeeds(ctx, user.DID, 7) 58 + if err != nil { 59 + s.logger.Warn("failed to list dead feeds", "error", err, "did", user.DID) 60 + } 61 + 62 + categories, err := s.dbs.Articles.GetCategories(ctx, user.DID) 63 + if err != nil { 64 + s.logger.Warn("failed to get categories", "error", err, "did", user.DID) 65 + } 42 66 43 67 s.render(w, r, "feeds.html", map[string]any{ 44 68 "User": user, ··· 80 104 if faviconURL == "" { 81 105 go func() { 82 106 if f := feed.ResolveFavicon(context.Background(), feedURL, result.Feed.SiteURL); f != "" { 83 - _ = s.db.UpdateFeedFavicon(context.Background(), feedURL, f) 107 + _ = s.dbs.Articles.UpdateFeedFavicon(context.Background(), feedURL, f) 84 108 } 85 109 }() 86 110 } ··· 94 118 FeedType: nullString(result.Feed.Type), 95 119 FaviconURL: nullString(faviconURL), 96 120 } 97 - if err := s.db.UpsertFeed(r.Context(), f); err != nil { 121 + if err := s.dbs.Articles.UpsertFeed(r.Context(), f); err != nil { 98 122 s.logger.Error("failed to upsert feed", "error", err) 99 123 http.Error(w, err.Error(), http.StatusInternalServerError) 100 124 return ··· 118 142 subCID = cid 119 143 } 120 144 121 - if err := s.db.CreateSubscription(r.Context(), user.DID, feedURL, feedTitle, category, subURI, subCID); err != nil { 145 + if err := s.dbs.Articles.CreateSubscription(r.Context(), user.DID, feedURL, feedTitle, category, subURI, subCID); err != nil { 122 146 if errors.Is(err, db.ErrDuplicateSubscription) { 123 147 http.Error(w, "Already subscribed to this feed.", http.StatusConflict) 124 148 return ··· 128 152 return 129 153 } 130 154 131 - _ = s.engine.MarkImpressionActed(r.Context(), user.DID, "feed", feedURL) 155 + if err := s.engine.MarkImpressionActed(r.Context(), user.DID, "feed", feedURL); err != nil { 156 + s.logger.Warn("failed to mark impression acted", "error", err) 157 + } 132 158 sig := s.engine.GetDominantSignal(s.engine.GetWeights(r.Context(), user.DID)) 133 159 s.engine.RewardSignal(r.Context(), user.DID, sig) 134 160 135 - sub, _ := s.db.GetSubscription(r.Context(), user.DID, feedURL) 136 - if sub == nil { 161 + sub, err := s.dbs.Articles.GetSubscription(r.Context(), user.DID, feedURL) 162 + if err != nil { 163 + s.logger.Warn("failed to get subscription", "error", err) 137 164 w.WriteHeader(http.StatusNoContent) 138 165 return 139 166 } ··· 156 183 return 157 184 } 158 185 159 - sub, err := s.db.GetSubscription(r.Context(), user.DID, feedURL) 186 + sub, err := s.dbs.Articles.GetSubscription(r.Context(), user.DID, feedURL) 160 187 if err == nil && sub.URI.Valid { 161 188 if client := s.pdsClientForUser(r); client != nil { 162 189 parsed, ok := atproto.ParseRecordURI(sub.URI.String) ··· 170 197 } 171 198 } 172 199 173 - if err := s.db.DeleteSubscription(r.Context(), user.DID, feedURL); err != nil { 200 + if err := s.dbs.Articles.DeleteSubscription(r.Context(), user.DID, feedURL); err != nil { 174 201 s.logger.Error("failed to delete subscription", "error", err) 175 202 http.Error(w, err.Error(), http.StatusInternalServerError) 176 203 return ··· 182 209 func (s *Server) handleClearAllSubscriptions(w http.ResponseWriter, r *http.Request) { 183 210 user := currentUser(r) 184 211 185 - subs, _ := s.db.ListSubscriptions(r.Context(), user.DID, "", 1000, 0) 212 + subs, err := s.dbs.Articles.ListSubscriptions(r.Context(), user.DID, "", 1000, 0) 213 + if err != nil { 214 + s.logger.Warn("failed to list subscriptions", "error", err, "did", user.DID) 215 + } 186 216 if client := s.pdsClientForUser(r); client != nil { 187 217 for _, sub := range subs { 188 218 if sub.URI.Valid { ··· 196 226 } 197 227 } 198 228 199 - if err := s.db.DeleteAllSubscriptions(r.Context(), user.DID); err != nil { 229 + if err := s.dbs.Articles.DeleteAllSubscriptions(r.Context(), user.DID); err != nil { 200 230 s.logger.Error("failed to clear subscriptions", "error", err) 201 231 http.Error(w, err.Error(), http.StatusInternalServerError) 202 232 return ··· 231 261 SiteURL: nullString(fu.SiteURL), 232 262 Description: nullString(fu.Description), 233 263 } 234 - if upsertErr := s.db.UpsertFeed(r.Context(), f); upsertErr != nil { 264 + if upsertErr := s.dbs.Articles.UpsertFeed(r.Context(), f); upsertErr != nil { 235 265 s.logger.Error("failed to upsert feed", "error", upsertErr) 236 266 continue 237 267 } 238 268 239 269 go func(feedURL, siteURL string) { 240 270 if fav := feed.ResolveFavicon(context.Background(), feedURL, siteURL); fav != "" { 241 - _ = s.db.UpdateFeedFavicon(context.Background(), feedURL, fav) 271 + if err := s.dbs.Articles.UpdateFeedFavicon(context.Background(), feedURL, fav); err != nil { 272 + s.logger.Warn("failed to update favicon", "error", err, "feed", feedURL) 273 + } 242 274 } 243 275 }(fu.URL, fu.SiteURL) 244 276 ··· 259 291 subCID = cid 260 292 } 261 293 262 - if subErr := s.db.CreateSubscription(r.Context(), user.DID, fu.URL, fu.Title, fu.Category, subURI, subCID); subErr != nil { 294 + if subErr := s.dbs.Articles.CreateSubscription(r.Context(), user.DID, fu.URL, fu.Title, fu.Category, subURI, subCID); subErr != nil { 263 295 if !errors.Is(subErr, db.ErrDuplicateSubscription) { 264 296 s.logger.Error("failed to create subscription", "error", subErr) 265 297 } ··· 274 306 275 307 func (s *Server) handleOPMLDownload(w http.ResponseWriter, r *http.Request) { 276 308 user := currentUser(r) 277 - subs, _ := s.db.ListSubscriptions(r.Context(), user.DID, "", 1000, 0) 309 + subs, err := s.dbs.Articles.ListSubscriptions(r.Context(), user.DID, "", 1000, 0) 310 + if err != nil { 311 + s.logger.Warn("failed to list subscriptions", "error", err, "did", user.DID) 312 + } 278 313 279 314 var feedURLs []feed.FeedURL 280 315 for _, sub := range subs { ··· 298 333 299 334 func (s *Server) handleFeedList(w http.ResponseWriter, r *http.Request) { 300 335 user := currentUser(r) 301 - subs, _ := s.db.ListSubscriptions(r.Context(), user.DID, "", 100, 0) 336 + subs, err := s.dbs.Articles.ListSubscriptions(r.Context(), user.DID, "", 100, 0) 337 + if err != nil { 338 + s.logger.Warn("failed to list subscriptions", "error", err, "did", user.DID) 339 + } 302 340 s.render(w, r, "feeds.html", map[string]any{ 303 341 "User": user, 304 342 "Subscriptions": subs, ··· 308 346 func (s *Server) handleRefreshFeeds(w http.ResponseWriter, r *http.Request) { 309 347 user := currentUser(r) 310 348 311 - subs, _ := s.db.ListSubscriptions(r.Context(), user.DID, "", 100, 0) 349 + subs, err := s.dbs.Articles.ListSubscriptions(r.Context(), user.DID, "", 100, 0) 350 + if err != nil { 351 + s.logger.Warn("failed to list subscriptions", "error", err, "did", user.DID) 352 + } 312 353 seen := make(map[string]bool) 313 354 for _, sub := range subs { 314 355 if seen[sub.FeedURL] { ··· 316 357 } 317 358 seen[sub.FeedURL] = true 318 359 319 - f, err := s.db.GetFeed(r.Context(), sub.FeedURL) 360 + f, err := s.dbs.Articles.GetFeed(r.Context(), sub.FeedURL) 320 361 if err != nil { 362 + s.logger.Warn("failed to get feed", "error", err, "feed", sub.FeedURL) 321 363 continue 322 364 } 323 365 ff := &feed.Feed{ ··· 332 374 s.scheduler.FetchFeed(r.Context(), ff) 333 375 } 334 376 335 - subs, _ = s.db.ListSubscriptions(r.Context(), user.DID, "", 100, 0) 377 + subs, err = s.dbs.Articles.ListSubscriptions(r.Context(), user.DID, "", 100, 0) 378 + if err != nil { 379 + s.logger.Warn("failed to list subscriptions", "error", err, "did", user.DID) 380 + } 336 381 s.render(w, r, "feed-list.html", map[string]any{ 337 382 "User": user, 338 383 "Subscriptions": subs, ··· 346 391 return 347 392 } 348 393 349 - f, err := s.db.GetFeed(r.Context(), feedURL) 394 + f, err := s.dbs.Articles.GetFeed(r.Context(), feedURL) 350 395 if err != nil { 351 396 http.Error(w, "feed not found", http.StatusNotFound) 352 397 return ··· 364 409 s.scheduler.FetchFeed(r.Context(), ff) 365 410 366 411 user := currentUser(r) 367 - deadFeeds, _ := s.db.ListDeadFeeds(r.Context(), user.DID, 7) 412 + deadFeeds, err := s.dbs.Articles.ListDeadFeeds(r.Context(), user.DID, 7) 413 + if err != nil { 414 + s.logger.Warn("failed to list dead feeds", "error", err, "did", user.DID) 415 + } 368 416 if len(deadFeeds) == 0 { 369 417 w.Header().Set("Content-Type", "text/html") 370 418 w.Write([]byte(""))
+25 -9
internal/server/profile_handler.go
··· 10 10 ) 11 11 12 12 func (s *Server) handleProfile(w http.ResponseWriter, r *http.Request) { 13 + ctx := r.Context() 13 14 param := chi.URLParam(r, "did") 14 15 15 16 var did string 16 17 if strings.HasPrefix(param, "did:") { 17 18 did = param 18 19 } else { 19 - profileUser, err := s.db.GetUserByHandle(r.Context(), param) 20 + profileUser, err := s.dbs.Articles.GetUserByHandle(ctx, param) 20 21 if err == nil { 21 22 did = profileUser.DID 22 23 } else { 23 - resolved, err := atproto.ResolveHandle(r.Context(), param) 24 + resolved, err := atproto.ResolveHandle(ctx, param) 24 25 if err != nil { 26 + s.logger.Warn("failed to resolve handle", "error", err, "handle", param) 25 27 http.Error(w, "handle not found", http.StatusNotFound) 26 28 return 27 29 } ··· 29 31 } 30 32 } 31 33 32 - profileUser, err := s.db.GetUser(r.Context(), did) 34 + profileUser, err := s.dbs.Articles.GetUser(ctx, did) 33 35 if err != nil { 34 - http.Error(w, err.Error(), http.StatusNotFound) 36 + s.logger.Warn("failed to get user", "error", err, "did", did) 37 + http.Error(w, "user not found", http.StatusNotFound) 35 38 return 36 39 } 37 40 38 41 if !profileUser.AvatarURL.Valid || profileUser.AvatarURL.String == "" { 39 - _, displayName, avatarURL, err := atproto.FetchProfile(r.Context(), did) 42 + _, displayName, avatarURL, err := atproto.FetchProfile(ctx, did) 40 43 if err == nil && avatarURL != "" { 41 - _ = s.db.UpdateUserProfile(r.Context(), did, displayName, avatarURL) 44 + if err := s.dbs.Articles.UpdateUserProfile(ctx, did, displayName, avatarURL); err != nil { 45 + s.logger.Warn("failed to update user profile", "error", err, "did", did) 46 + } 42 47 profileUser.DisplayName = nullString(displayName) 43 48 profileUser.AvatarURL = nullString(avatarURL) 44 49 } 45 50 } 46 51 47 - subs, _ := s.db.ListSubscriptions(r.Context(), did, "", 50, 0) 48 - annotations, _ := s.db.ListAnnotations(r.Context(), "", "", did, 50, 0) 49 - subCount, _ := s.db.GetSubscriptionCount(r.Context(), did) 52 + subs, err := s.dbs.Articles.ListSubscriptions(ctx, did, "", 50, 0) 53 + if err != nil { 54 + s.logger.Warn("failed to list subscriptions", "error", err, "did", did) 55 + } 56 + 57 + annotations, err := s.dbs.Articles.ListAnnotations(ctx, "", "", did, 50, 0) 58 + if err != nil { 59 + s.logger.Warn("failed to list annotations", "error", err, "did", did) 60 + } 61 + 62 + subCount, err := s.dbs.Articles.GetSubscriptionCount(ctx, did) 63 + if err != nil { 64 + s.logger.Warn("failed to get subscription count", "error", err, "did", did) 65 + } 50 66 51 67 user := currentUser(r) 52 68
+15 -15
internal/server/server.go
··· 56 56 } 57 57 58 58 type Server struct { 59 - db *db.DB 59 + dbs *db.Databases 60 60 router *chi.Mux 61 61 templates *template.Template 62 62 logger *slog.Logger ··· 70 70 callbackURL string 71 71 } 72 72 73 - func New(database *db.DB, clientID, callbackURL, addr string, scheduler *feed.Scheduler, engine *cluster.Engine, logger *slog.Logger) *Server { 74 - oauthStore := db.NewOAuthStore(database) 73 + func New(dbs *db.Databases, clientID, callbackURL, addr string, scheduler *feed.Scheduler, engine *cluster.Engine, logger *slog.Logger) *Server { 74 + oauthStore := db.NewOAuthStore(dbs.Users) 75 75 76 76 var config oauth.ClientConfig 77 77 if clientID == "" { ··· 87 87 oauthClient := oauth.NewClientApp(&config, oauthStore) 88 88 89 89 s := &Server{ 90 - db: database, 90 + dbs: dbs, 91 91 router: chi.NewRouter(), 92 92 logger: logger, 93 93 oauth: oauthClient, ··· 200 200 s.router.Post("/auth/logout", s.handleAuthLogout) 201 201 s.router.Get("/oauth/client-metadata", s.handleOAuthClientMetadata) 202 202 203 - xrpc := atproto.NewXRPCHandler(s.db.DB, s.engine) 203 + xrpc := atproto.NewXRPCHandler(s.dbs.Articles.DB, s.engine) 204 204 s.router.Get("/xrpc/at.glean.listSubscriptions", xrpc.ListSubscriptions) 205 205 s.router.Get("/xrpc/at.glean.listAnnotations", xrpc.ListAnnotations) 206 206 s.router.Get("/xrpc/at.glean.listLikes", xrpc.ListLikes) ··· 382 382 defer cancel() 383 383 384 384 isNewUser := false 385 - if count, err := s.db.GetSubscriptionCount(ctx, userDID); err == nil && count == 0 { 385 + if count, err := s.dbs.Articles.GetSubscriptionCount(ctx, userDID); err == nil && count == 0 { 386 386 isNewUser = true 387 387 } 388 388 389 - sync := atproto.NewSync(s.db, client, s.logger) 389 + sync := atproto.NewSync(s.dbs.Articles, s.dbs.Users, client, s.logger) 390 390 if err := sync.Run(ctx, userDID); err != nil { 391 391 s.logger.Error("background sync failed", "error", err, "did", userDID) 392 392 } ··· 398 398 } 399 399 400 400 func (s *Server) refreshUserFeeds(ctx context.Context, userDID string) { 401 - subs, err := s.db.ListSubscriptions(ctx, userDID, "", 1000, 0) 401 + subs, err := s.dbs.Articles.ListSubscriptions(ctx, userDID, "", 1000, 0) 402 402 if err != nil { 403 403 s.logger.Error("failed to list subscriptions for initial fetch", "error", err, "did", userDID) 404 404 return ··· 415 415 } 416 416 seen[sub.FeedURL] = true 417 417 418 - f, err := s.db.GetFeed(ctx, sub.FeedURL) 418 + f, err := s.dbs.Articles.GetFeed(ctx, sub.FeedURL) 419 419 if err != nil { 420 420 continue 421 421 } ··· 458 458 return 459 459 } 460 460 461 - existing, err := s.db.ListUserDIDs(ctx) 461 + existing, err := s.dbs.Users.ListUserDIDs(ctx) 462 462 if err != nil { 463 463 s.logger.Error("failed to list existing users", "error", err) 464 464 return ··· 499 499 avatarURL = avatar 500 500 } 501 501 502 - if _, err := s.db.CreateUser(ctx, did, handle, displayName, avatarURL); err != nil { 502 + if _, err := s.dbs.Users.CreateUser(ctx, did, handle, displayName, avatarURL); err != nil { 503 503 s.logger.Error("failed to create user during backfill", "error", err, "did", did) 504 504 return 505 505 } ··· 511 511 } 512 512 513 513 client := atproto.NewUnauthenticatedClient(pdsURL) 514 - sync := atproto.NewSync(s.db, client, s.logger) 514 + sync := atproto.NewSync(s.dbs.Articles, s.dbs.Users, client, s.logger) 515 515 if err := sync.Run(ctx, did); err != nil { 516 516 s.logger.Error("backfill sync failed", "error", err, "did", did) 517 517 } ··· 525 525 } 526 526 527 527 func (s *Server) runSyncAll(ctx context.Context) { 528 - users, err := s.db.ListUsers(ctx) 528 + users, err := s.dbs.Users.ListUsers(ctx) 529 529 if err != nil { 530 530 s.logger.Error("failed to list users for sync", "error", err) 531 531 return ··· 549 549 } 550 550 551 551 client := atproto.NewClient(sess.APIClient()) 552 - sync := atproto.NewSync(s.db, client, s.logger) 552 + sync := atproto.NewSync(s.dbs.Articles, s.dbs.Users, client, s.logger) 553 553 if err := sync.Run(ctx, u.DID); err != nil { 554 554 metrics.SyncErrors.Inc() 555 555 s.logger.Error("periodic sync failed", "error", err, "did", u.DID) 556 556 } 557 557 558 558 if dn, avatar, err := client.GetProfile(ctx, u.DID); err == nil { 559 - _ = s.db.UpdateUserProfile(ctx, u.DID, dn, avatar) 559 + _ = s.dbs.Users.UpdateUserProfile(ctx, u.DID, dn, avatar) 560 560 } 561 561 562 562 metrics.SyncRuns.Inc()
+1 -1
internal/server/session.go
··· 34 34 return nil 35 35 } 36 36 37 - user, err := s.db.GetUser(r.Context(), data.DID) 37 + user, err := s.dbs.Users.GetUser(r.Context(), data.DID) 38 38 if err != nil { 39 39 return nil 40 40 }
+7 -2
internal/server/trending_handler.go
··· 9 9 10 10 func (s *Server) handleTrending(w http.ResponseWriter, r *http.Request) { 11 11 user := currentUser(r) 12 + ctx := r.Context() 12 13 13 14 scope := r.URL.Query().Get("scope") 14 15 if scope == "for-me" && user == nil { ··· 28 29 } 29 30 30 31 var trending []*db.TrendingItem 32 + var err error 31 33 if scope == "for-me" { 32 - trending, _ = s.db.ListTrendingArticlesForUser(r.Context(), userDID, since, page.Limit()+1, page.Offset()) 34 + trending, err = s.dbs.Articles.ListTrendingArticlesForUser(ctx, userDID, since, page.Limit()+1, page.Offset()) 33 35 } else { 34 - trending, _ = s.db.ListTrendingArticles(r.Context(), userDID, since, page.Limit()+1, page.Offset()) 36 + trending, err = s.dbs.Articles.ListTrendingArticles(ctx, userDID, since, page.Limit()+1, page.Offset()) 37 + } 38 + if err != nil { 39 + s.logger.Warn("failed to list trending articles", "error", err, "scope", scope) 35 40 } 36 41 37 42 totalFetched := len(trending)
+7 -7
main.go
··· 34 34 35 35 logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) 36 36 37 - database, err := db.Open(*dbPath) 37 + dbs, err := db.OpenAll(*dbPath) 38 38 if err != nil { 39 - logger.Error("failed to open database", "error", err) 39 + logger.Error("failed to open databases", "error", err) 40 40 os.Exit(1) 41 41 } 42 - defer database.Close() 42 + defer dbs.Close() 43 43 44 44 clientID := envOr("GLEAN_OAUTH_CLIENT_ID", "") 45 45 callbackURL := envOr("GLEAN_OAUTH_REDIRECT_URL", "") 46 46 47 - storeAdapter := db.NewFeedStoreAdapter(database) 47 + storeAdapter := db.NewFeedStoreAdapter(dbs.Articles) 48 48 scheduler := feed.NewScheduler(storeAdapter, logger, *fetchInterval, 30*time.Minute) 49 49 50 - engine := cluster.NewEngine(database.DB, logger) 50 + engine := cluster.NewEngine(dbs.Users.DB, logger) 51 51 52 - srv := server.New(database, clientID, callbackURL, *addr, scheduler, engine, logger) 52 + srv := server.New(dbs, clientID, callbackURL, *addr, scheduler, engine, logger) 53 53 54 54 cron := cluster.NewCron(engine, *clusterInterval, logger) 55 55 56 - handler := atproto.NewStreamDBHandler(database, logger) 56 + handler := atproto.NewStreamDBHandler(dbs.Articles, dbs.Users, logger) 57 57 jetstream := atproto.NewJetstreamConsumer(*jetstreamURL, handler.Handle, logger) 58 58 59 59 ctx, cancel := context.WithCancel(context.Background())
+1 -1
readme.md
··· 40 40 | Variable | Default | What it does | 41 41 | -------------------------- | -------------------------- | --------------------------------------------------------- | 42 42 | `GLEAN_ADDR` | `:8080` | Listen address | 43 - | `GLEAN_DB` | `glean.db` | SQLite database path | 43 + | `GLEAN_DB` | `glean.db` | SQLite base path (`_users`, `_articles`, `_recs` suffixes) | 44 44 | `GLEAN_JETSTREAM` | `wss://jetstream.glean.at` | Jetstream WebSocket URL | 45 45 | `GLEAN_SYNC_INTERVAL` | `1h` | PDS sync interval (Go duration: `30m`, `2h30m`, etc.) | 46 46 | `GLEAN_CLUSTER_INTERVAL` | `10m` | Cluster recomputation interval (Go duration) |