···1111# Leave empty for localhost OAuth (development)
1212# GLEAN_OAUTH_CLIENT_ID=https://glean.at/oauth/client-metadata
1313# GLEAN_OAUTH_REDIRECT_URL=https://glean.at/auth/callback
1414+# Embeddings (recommended — powers content-based feed/article recommendations)
1515+# Point to any OpenAI-compatible /v1/embeddings endpoint (OpenAI, Ollama, etc.)
1616+# Without embeddings, recommendations rely only on subscription overlap and social graph.
1717+GLEAN_EMBED_BASE_URL=https://api.openai.com/v1
1818+GLEAN_EMBED_API_KEY=sk-...
1919+GLEAN_EMBED_MODEL=text-embedding-3-small
2020+GLEAN_EMBED_DIMENSION=1536
···1313| Layer | Technology |
1414| ---------------- | --------------------------------------------------------------- |
1515| Backend | Go |
1616-| Database | SQLite (3 files: users, articles, recs via `mattn/go-sqlite3`) |
1616+| Database | SQLite (3 files: users, articles, recs via `mattn/go-sqlite3` + `sqlite-vec` for vector search) |
1717| Frontend | htmx + TailwindCSS |
1818| Auth | AT Protocol OAuth / DID resolution (configurable PLC directory) |
1919| AT Protocol role | AppView for `at.glean.*` lexicons |
···460460461461```sql
462462CREATE TABLE users (
463463- did TEXT PRIMARY KEY,
464464- indexed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
465465- updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
463463+ did TEXT PRIMARY KEY,
464464+ indexed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
465465+ updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
466466+ follows_dirty BOOLEAN NOT NULL DEFAULT 1
466467);
467468```
468469···649650| Subscription | `subscriptions` | 1.0 | Jaccard over subscriber sets between similar users |
650651| Like | `likes` | 0.5 | Time-decayed like co-occurrence (30-day half-life) |
651652| Tag | `annotations.tags` | 0.3 | Jaccard over annotation tag sets |
652652-| Social | `follow_distances` | 0.7 | Follow distance: 1-hop=1.0, 2-hop=0.3 |
653653+| Social | `follow_distances` | 0.7 | Follow distance: 1-hop=1.0, 2-hop=0.3, 3-hop=0.1 |
653654| Popularity | `feeds.subscriber_count` | 0.2 | `log(1 + subscribers) / log(1 + max)` |
654655| Category | `subscriptions.category` | 0.4 | Boost feeds matching user's existing categories |
656656+| Content | `article_embeddings` | 0.4 | Cosine similarity via embedding KNN (requires embedder) |
655657656658### 7.2 Feed Co-occurrence (Jaccard Similarity)
657659···661663J(A, B) = |subscribers(A) ∩ subscribers(B)| / |subscribers(A) ∪ subscribers(B)|
662664```
663665664664-Feed description text similarity is also computed (word overlap after stopword removal) and added as a boost.
666666+Feed description similarity is also computed via embedding cosine similarity (requires embedder) and added as a boost.
665667666668### 7.3 User Similarity
667669···700702```
701703score = like_signal * w_like
702704 + social_signal * w_social
705705+ + content_signal * w_content
703706 + recency_signal * 0.2
704707```
705708709709+Content signal uses embedding vectors: the user's liked article embeddings are averaged into a single interest vector, then a KNN query against the `article_embeddings` vec0 table finds semantically similar articles. This requires an embedder to be configured; without it, the content signal is 0.
710710+706711### 7.5 User Feedback (Dismiss)
707712708713Users can dismiss recommendations they don't want to see again:
···711716- `POST /articles/dismiss` — dismiss an article recommendation
712717- Dismissals are stored locally in `dismissed_recommendations` (not on PDS)
713718- Dismissed items are excluded from all future recommendation queries
714714-- Auto-dismiss: items shown >15 times over >30 days without action are auto-dismissed
719719+- Auto-dismiss: items shown ≥5 times over >5 days without action are auto-dismissed
715720716721Impression tracking (`recommendation_impressions`) records how many times each recommendation was shown and whether the user acted on it.
717722···729734730735### 7.7 Social Graph
731736732732-Follow distances (1-hop and 2-hop) are pre-computed in `follow_distances` during the cron job:
737737+Follow distances (1-hop through 3-hop) are computed incrementally. A `follows_dirty` column on `users` tracks whose follow graph changed since the last cron run. Only dirty users are reprocessed — their existing rows in `follow_distances` are deleted and recomputed via BFS, then the dirty flag is cleared.
733738734739- 1-hop: direct follows (weight 1.0)
735740- 2-hop: friends-of-friends (weight 0.3)
736736-- 3-hop is excluded due to noise and computational cost
741741+- 3-hop: third-degree connections (weight 0.1)
737742738743### 7.8 Diversity & Freshness
739744···754759755760A background goroutine runs on a configurable schedule (`GLEAN_CLUSTER_INTERVAL`, default 10m):
756761757757-1. **Compute feed similarity**: Batch-update `feed_similarity` table (Jaccard over subscriber sets + description similarity)
758758-2. **Compute user similarity**: Batch-update `user_similarity` table (subscription Jaccard + time-decayed likes + tags + follow boost)
759759-3. **Compute follow distances**: 1-hop and 2-hop from `follows` table
760760-4. **Compute signal profiles**: Per-user category/tag/like summaries
761761-5. **Auto-dismiss stale**: Dismiss items shown >15 times over >30 days without action
762762+1. **Compute feed embeddings**: Embed new feed descriptions via OpenAI-compatible API into `feed_embeddings` table (skipped if no embedder configured)
763763+2. **Compute feed similarity**: Batch-update `feed_similarity` table (Jaccard over subscriber sets + embedding cosine similarity)
764764+3. **Compute user similarity**: Batch-update `user_similarity` table (subscription Jaccard + time-decayed likes + tags + follow boost)
765765+4. **Compute article embeddings**: Embed new articles' full content (`title + summary + full_content + content`) via OpenAI-compatible API into `article_embeddings` vec0 table (skipped if no embedder configured)
766766+5. **Compute follow distances**: Incremental BFS for dirty users (1-hop through 3-hop from `follows` table)
767767+6. **Compute signal profiles**: Per-user category/tag/like summaries
768768+7. **Auto-dismiss stale**: Dismiss items shown >=5 times over >5 days without action
762769763770Jetstream ingestion and record indexing happen in a separate persistent goroutine (the Jetstream consumer), not in the cron.
764771765765-### 7.11 Recommendation Tables (`<base>_recs`)
772772+### 7.11 User Interaction Tables (`<base>_users`)
773773+774774+Per-user interaction state lives in the users database so that real-time writes (impressions, dismissals) never contend with cron batch writes to the recs database.
766775767776```sql
768777CREATE TABLE dismissed_recommendations (
···784793 acted BOOLEAN NOT NULL DEFAULT 0,
785794 PRIMARY KEY (user_did, target_type, target_id)
786795);
796796+```
797797+798798+### 7.12 Computed Recommendation Tables (`<base>_recs`)
799799+800800+Written exclusively by the cron. No user-facing writes — only reads during on-demand scoring.
801801+802802+```sql
803803+CREATE TABLE feed_similarity (
804804+ feed_a TEXT NOT NULL,
805805+ feed_b TEXT NOT NULL,
806806+ jaccard REAL NOT NULL,
807807+ computed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
808808+ PRIMARY KEY (feed_a, feed_b),
809809+ CHECK(feed_a < feed_b)
810810+);
811811+812812+CREATE TABLE user_similarity (
813813+ user_a TEXT NOT NULL,
814814+ user_b TEXT NOT NULL,
815815+ jaccard REAL NOT NULL,
816816+ common_feeds INTEGER NOT NULL,
817817+ common_likes INTEGER NOT NULL DEFAULT 0,
818818+ common_tags INTEGER NOT NULL DEFAULT 0,
819819+ computed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
820820+ PRIMARY KEY (user_a, user_b),
821821+ CHECK(user_a < user_b)
822822+);
787823788824CREATE TABLE follow_distances (
789825 user_a TEXT NOT NULL,
790826 user_b TEXT NOT NULL,
791791- distance INTEGER NOT NULL CHECK(distance IN (1, 2)),
827827+ distance INTEGER NOT NULL CHECK(distance IN (1, 2, 3)),
792828 PRIMARY KEY (user_a, user_b)
793829);
794830···800836 w_social REAL NOT NULL DEFAULT 0.7,
801837 w_pop REAL NOT NULL DEFAULT 0.2,
802838 w_category REAL NOT NULL DEFAULT 0.4,
839839+ w_content REAL NOT NULL DEFAULT 0.4,
803840 updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
804841);
805842···808845 total_likes INTEGER NOT NULL DEFAULT 0,
809846 total_tags INTEGER NOT NULL DEFAULT 0,
810847 top_categories TEXT,
848848+ top_tags TEXT,
811849 updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
812850);
813851```
814852853853+### 7.13 Embeddings (recommended)
854854+855855+When `GLEAN_EMBED_BASE_URL` is configured, article text and feed descriptions are embedded into vectors stored in `sqlite-vec` virtual tables (`recs.feed_embeddings`, `recs.article_embeddings`). The vec0 extension provides native KNN vector search via `WHERE embedding MATCH ? AND k = ?`, replacing Go-side cosine similarity for large-scale lookups. Without embeddings, recommendations rely only on subscription overlap, like patterns, and social graph — no content-based signals.
856856+857857+The embedder uses the official `github.com/openai/openai-go` SDK with `option.WithBaseURL()`, so any OpenAI-compatible `/v1/embeddings` endpoint works (OpenAI, Ollama, local inference servers).
858858+859859+vec0 tables are created dynamically at startup with the configured dimension (`GLEAN_EMBED_DIMENSION`, default 1536):
860860+861861+```sql
862862+CREATE VIRTUAL TABLE recs.feed_embeddings USING vec0(
863863+ feed_url TEXT PRIMARY KEY,
864864+ embedding float[1536]
865865+);
866866+867867+CREATE VIRTUAL TABLE recs.article_embeddings USING vec0(
868868+ article_id INTEGER PRIMARY KEY,
869869+ embedding float[1536]
870870+);
871871+```
872872+873873+Since vec0 virtual tables cannot hold metadata columns, a side table tracks the source text for re-embedding on description changes:
874874+875875+```sql
876876+CREATE TABLE recs.feed_embedding_meta (
877877+ feed_url TEXT PRIMARY KEY,
878878+ source_text TEXT NOT NULL DEFAULT ''
879879+);
880880+```
881881+882882+During cron, `ComputeArticleEmbeddings` embeds new articles in batches (using `title + summary + full_content + content` for maximum semantic coverage) and inserts them into the vec0 table. `ComputeFeedEmbeddings` embeds feed descriptions (`title || description`) and re-embeds when the source text changes (detected via `feed_embedding_meta`). During on-demand article recommendations, the user's liked article embeddings are averaged into an interest vector, then a vec0 KNN query finds the top-200 most semantically similar articles. For cold-start users (<5 subscriptions), their subscribed feed embeddings are averaged and a KNN query finds similar feeds.
883883+815884## 8. HTTP API / htmx Endpoints
816885817886The server renders HTML fragments that htmx swaps into the page. No JSON API needed for the frontend.
···903972│ │ └── metrics.go # Prometheus metrics definitions
904973│ ├── cluster/
905974│ │ ├── jaccard.go # Jaccard similarity computation
975975+│ │ ├── embed.go # Embedder interface + OpenAI-compatible implementation
976976+│ │ ├── article.go # Article + feed embedding computation, vec0 KNN content boost
906977│ │ ├── scoring.go # Feed + people + article recommendation queries (on-demand)
907907-│ │ ├── social.go # Follow-distance computation (1-2 hop)
978978+│ │ ├── social.go # Incremental follow-distance computation (1-3 hop, dirty-flag)
908979│ │ ├── dismiss.go # Dismiss + impression tracking
909980│ │ ├── weights.go # Bandit-style signal weight auto-tuning
910981│ │ ├── diversity.go # Post-query domain/category diversity filtering
···99410659951066```
9961067Cron (every 10m) ──► Cluster Engine
997997- │
998998- ├─► Compute feed similarity
999999- ├─► Compute user similarity
10001000- ├─► Compute follow distances
10011001- ├─► Compute signal profiles
10021002- └─► Auto-dismiss stale recommendations
10681068+ │
10691069+ ├─► Compute feed similarity
10701070+ ├─► Compute user similarity
10711071+ ├─► Compute article embeddings (if embedder configured)
10721072+ ├─► Compute follow distances
10731073+ ├─► Compute signal profiles
10741074+ └─► Auto-dismiss stale recommendations
1003107510041076Browser ──GET /dashboard──► Server
10051077 │
···11+package cluster
22+33+import (
44+ "context"
55+ "database/sql"
66+ "fmt"
77+ "log/slog"
88+ "strings"
99+1010+ vec "github.com/asg017/sqlite-vec-go-bindings/cgo"
1111+)
1212+1313+const embedBatchSize = 100
1414+1515+// ComputeArticleEmbeddings embeds new articles (title + summary) into the
1616+// article_embeddings vec0 table. Skipped when no embedder is configured.
1717+// Existing embeddings for deleted articles are cleaned up. Articles already
1818+// embedded are not re-embedded.
1919+func (e *Engine) ComputeArticleEmbeddings(ctx context.Context) error {
2020+ if e.embedder == nil {
2121+ e.logger.Debug("article embeddings skipped, no embedder")
2222+ return nil
2323+ }
2424+2525+ conn, err := e.db.Conn(ctx)
2626+ if err != nil {
2727+ return err
2828+ }
2929+ defer conn.Close()
3030+3131+ _, err = conn.ExecContext(ctx, `
3232+ DELETE FROM recs.article_embeddings
3333+ WHERE article_id NOT IN (SELECT id FROM articles.articles)
3434+ `)
3535+ if err != nil {
3636+ return fmt.Errorf("clean stale article embeddings: %w", err)
3737+ }
3838+3939+ rows, err := conn.QueryContext(ctx, `
4040+ SELECT a.id, COALESCE(a.title, '') || ' ' || COALESCE(a.summary, '') || ' ' || COALESCE(a.full_content, '') || ' ' || COALESCE(a.content, '')
4141+ FROM articles.articles a
4242+ WHERE (COALESCE(a.title, '') != '' OR COALESCE(a.summary, '') != '' OR COALESCE(a.full_content, '') != '' OR COALESCE(a.content, '') != '')
4343+ AND a.id NOT IN (SELECT article_id FROM recs.article_embeddings)
4444+ ORDER BY a.id
4545+ `)
4646+ if err != nil {
4747+ return err
4848+ }
4949+5050+ type article struct {
5151+ id int64
5252+ text string
5353+ }
5454+ var batch []article
5555+ for rows.Next() {
5656+ var a article
5757+ if err := rows.Scan(&a.id, &a.text); err != nil {
5858+ rows.Close()
5959+ return err
6060+ }
6161+ batch = append(batch, a)
6262+ }
6363+ rows.Close()
6464+6565+ if len(batch) == 0 {
6666+ e.logger.Info("article embeddings up to date")
6767+ return nil
6868+ }
6969+7070+ for i := 0; i < len(batch); i += embedBatchSize {
7171+ end := min(i+embedBatchSize, len(batch))
7272+ sub := batch[i:end]
7373+7474+ texts := make([]string, len(sub))
7575+ for j, a := range sub {
7676+ texts[j] = a.text
7777+ }
7878+7979+ embeddings, err := e.embedder.Embed(ctx, texts)
8080+ if err != nil {
8181+ return fmt.Errorf("embed batch %d: %w", i/embedBatchSize, err)
8282+ }
8383+8484+ tx, err := conn.BeginTx(ctx, nil)
8585+ if err != nil {
8686+ return err
8787+ }
8888+ defer func() { _ = tx.Rollback() }()
8989+9090+ for j, emb := range embeddings {
9191+ blob, err := vec.SerializeFloat32(emb)
9292+ if err != nil {
9393+ return fmt.Errorf("serialize embedding: %w", err)
9494+ }
9595+ if _, err := tx.ExecContext(ctx,
9696+ `INSERT OR IGNORE INTO recs.article_embeddings(article_id, embedding) VALUES (?, ?)`,
9797+ sub[j].id, blob,
9898+ ); err != nil {
9999+ return fmt.Errorf("insert embedding: %w", err)
100100+ }
101101+ }
102102+103103+ if err := tx.Commit(); err != nil {
104104+ return err
105105+ }
106106+107107+ e.logger.Info("article embeddings batch computed",
108108+ slog.Int("batch", i/embedBatchSize),
109109+ slog.Int("count", len(sub)),
110110+ )
111111+ }
112112+113113+ e.logger.Info("article embeddings computed", slog.Int("total", len(batch)))
114114+ return nil
115115+}
116116+117117+func (e *Engine) populateContentBoost(ctx context.Context, conn *sql.Conn, userDID string) error {
118118+ rows, err := conn.QueryContext(ctx, `
119119+ SELECT a.id FROM articles.likes ul
120120+ JOIN articles.articles a ON a.feed_url = ul.feed_url AND a.url = ul.article_url
121121+ WHERE ul.author_did = ?
122122+ `, userDID)
123123+ if err != nil {
124124+ return err
125125+ }
126126+ var articleIDs []int64
127127+ for rows.Next() {
128128+ var id int64
129129+ if err := rows.Scan(&id); err != nil {
130130+ rows.Close()
131131+ return err
132132+ }
133133+ articleIDs = append(articleIDs, id)
134134+ }
135135+ rows.Close()
136136+137137+ if len(articleIDs) == 0 {
138138+ return nil
139139+ }
140140+141141+ ph := make([]string, len(articleIDs))
142142+ args := make([]any, len(articleIDs))
143143+ for i, id := range articleIDs {
144144+ ph[i] = "?"
145145+ args[i] = id
146146+ }
147147+ embRows, err := conn.QueryContext(ctx,
148148+ fmt.Sprintf("SELECT article_id, embedding FROM recs.article_embeddings WHERE article_id IN (%s)", joinPh(ph)),
149149+ args...,
150150+ )
151151+ if err != nil {
152152+ return err
153153+ }
154154+155155+ dim := e.embedder.Dimension()
156156+ sumVec := make([]float32, dim)
157157+ count := 0
158158+ likedSet := make(map[int64]bool)
159159+ for embRows.Next() {
160160+ var id int64
161161+ var blob []byte
162162+ if err := embRows.Scan(&id, &blob); err != nil {
163163+ embRows.Close()
164164+ return err
165165+ }
166166+ v := deserializeFloat32(blob)
167167+ if len(v) != dim {
168168+ continue
169169+ }
170170+ for j := range sumVec {
171171+ sumVec[j] += v[j]
172172+ }
173173+ count++
174174+ likedSet[id] = true
175175+ }
176176+ embRows.Close()
177177+178178+ if count == 0 {
179179+ return nil
180180+ }
181181+182182+ avgVec := make([]float32, dim)
183183+ for j := range avgVec {
184184+ avgVec[j] = sumVec[j] / float32(count)
185185+ }
186186+187187+ queryBlob, err := vec.SerializeFloat32(avgVec)
188188+ if err != nil {
189189+ return fmt.Errorf("serialize query vector: %w", err)
190190+ }
191191+192192+ const topK = 200
193193+ knnRows, err := conn.QueryContext(ctx, `
194194+ SELECT article_id, distance FROM recs.article_embeddings
195195+ WHERE embedding MATCH ? AND k = ?
196196+ ORDER BY distance
197197+ `, queryBlob, topK+len(likedSet))
198198+ if err != nil {
199199+ return err
200200+ }
201201+202202+ tx, err := conn.BeginTx(ctx, nil)
203203+ if err != nil {
204204+ return err
205205+ }
206206+ defer func() { _ = tx.Rollback() }()
207207+208208+ for knnRows.Next() {
209209+ var id int64
210210+ var dist float64
211211+ if err := knnRows.Scan(&id, &dist); err != nil {
212212+ knnRows.Close()
213213+ return err
214214+ }
215215+ if likedSet[id] {
216216+ continue
217217+ }
218218+ score := 1.0 - dist
219219+ if score <= 0 {
220220+ continue
221221+ }
222222+ if _, err := tx.ExecContext(ctx,
223223+ `INSERT OR IGNORE INTO _content_boost (article_id, score) VALUES (?, ?)`,
224224+ id, score,
225225+ ); err != nil {
226226+ return err
227227+ }
228228+ }
229229+ knnRows.Close()
230230+231231+ return tx.Commit()
232232+}
233233+234234+// ComputeFeedEmbeddings embeds feed descriptions (title + description) into the
235235+// feed_embeddings vec0 table and tracks source text in feed_embedding_meta for
236236+// re-embedding on description change. Skipped when no embedder is configured.
237237+func (e *Engine) ComputeFeedEmbeddings(ctx context.Context) error {
238238+ if e.embedder == nil {
239239+ e.logger.Debug("feed embeddings skipped, no embedder")
240240+ return nil
241241+ }
242242+243243+ conn, err := e.db.Conn(ctx)
244244+ if err != nil {
245245+ return err
246246+ }
247247+ defer conn.Close()
248248+249249+ _, err = conn.ExecContext(ctx, `
250250+ DELETE FROM recs.feed_embeddings
251251+ WHERE feed_url NOT IN (SELECT feed_url FROM articles.feeds)
252252+ `)
253253+ if err != nil {
254254+ return fmt.Errorf("clean stale feed embeddings: %w", err)
255255+ }
256256+ _, err = conn.ExecContext(ctx, `
257257+ DELETE FROM recs.feed_embedding_meta
258258+ WHERE feed_url NOT IN (SELECT feed_url FROM articles.feeds)
259259+ `)
260260+ if err != nil {
261261+ return fmt.Errorf("clean stale feed embeddings: %w", err)
262262+ }
263263+264264+ rows, err := conn.QueryContext(ctx, `
265265+ SELECT f.feed_url, COALESCE(f.title, '') || ' ' || COALESCE(f.description, '')
266266+ FROM articles.feeds f
267267+ WHERE (COALESCE(f.title, '') != '' OR COALESCE(f.description, '') != '')
268268+ AND (
269269+ f.feed_url NOT IN (SELECT feed_url FROM recs.feed_embedding_meta)
270270+ OR EXISTS (
271271+ SELECT 1 FROM recs.feed_embedding_meta fm
272272+ WHERE fm.feed_url = f.feed_url
273273+ AND fm.source_text != COALESCE(f.title, '') || ' ' || COALESCE(f.description, '')
274274+ )
275275+ )
276276+ ORDER BY f.feed_url
277277+ `)
278278+ if err != nil {
279279+ return err
280280+ }
281281+282282+ type feed struct {
283283+ url string
284284+ text string
285285+ }
286286+ var batch []feed
287287+ for rows.Next() {
288288+ var f feed
289289+ if err := rows.Scan(&f.url, &f.text); err != nil {
290290+ rows.Close()
291291+ return err
292292+ }
293293+ batch = append(batch, f)
294294+ }
295295+ rows.Close()
296296+297297+ if len(batch) == 0 {
298298+ e.logger.Info("feed embeddings up to date")
299299+ return nil
300300+ }
301301+302302+ for i := 0; i < len(batch); i += embedBatchSize {
303303+ end := min(i+embedBatchSize, len(batch))
304304+ sub := batch[i:end]
305305+306306+ texts := make([]string, len(sub))
307307+ for j, f := range sub {
308308+ texts[j] = f.text
309309+ }
310310+311311+ embeddings, err := e.embedder.Embed(ctx, texts)
312312+ if err != nil {
313313+ return fmt.Errorf("embed feed batch %d: %w", i/embedBatchSize, err)
314314+ }
315315+316316+ tx, err := conn.BeginTx(ctx, nil)
317317+ if err != nil {
318318+ return err
319319+ }
320320+ defer func() { _ = tx.Rollback() }()
321321+322322+ for j, emb := range embeddings {
323323+ blob, err := vec.SerializeFloat32(emb)
324324+ if err != nil {
325325+ return fmt.Errorf("serialize feed embedding: %w", err)
326326+ }
327327+ if _, err := tx.ExecContext(ctx,
328328+ `DELETE FROM recs.feed_embeddings WHERE feed_url = ?`, sub[j].url,
329329+ ); err != nil {
330330+ return fmt.Errorf("delete feed embedding: %w", err)
331331+ }
332332+ if _, err := tx.ExecContext(ctx,
333333+ `INSERT INTO recs.feed_embeddings(feed_url, embedding) VALUES (?, ?)`,
334334+ sub[j].url, blob,
335335+ ); err != nil {
336336+ return fmt.Errorf("insert feed embedding: %w", err)
337337+ }
338338+ if _, err := tx.ExecContext(ctx,
339339+ `INSERT OR REPLACE INTO recs.feed_embedding_meta(feed_url, source_text) VALUES (?, ?)`,
340340+ sub[j].url, sub[j].text,
341341+ ); err != nil {
342342+ return fmt.Errorf("insert feed embedding: %w", err)
343343+ }
344344+ }
345345+346346+ if err := tx.Commit(); err != nil {
347347+ return err
348348+ }
349349+350350+ e.logger.Info("feed embeddings batch computed",
351351+ slog.Int("batch", i/embedBatchSize),
352352+ slog.Int("count", len(sub)),
353353+ )
354354+ }
355355+356356+ e.logger.Info("feed embeddings computed", slog.Int("total", len(batch)))
357357+ return nil
358358+}
359359+360360+func (e *Engine) ensureContentBoostTable(ctx context.Context, conn *sql.Conn) error {
361361+ _, err := conn.ExecContext(ctx, `CREATE TEMP TABLE IF NOT EXISTS _content_boost (article_id INT PRIMARY KEY, score REAL)`)
362362+ if err != nil {
363363+ return err
364364+ }
365365+ _, err = conn.ExecContext(ctx, `DELETE FROM _content_boost`)
366366+ return err
367367+}
368368+369369+func joinPh(ph []string) string {
370370+ var s strings.Builder
371371+ for i, p := range ph {
372372+ if i > 0 {
373373+ s.WriteString(",")
374374+ }
375375+ s.WriteString(p)
376376+ }
377377+ return s.String()
378378+}
+15-3
internal/cluster/cron.go
···88 "pkg.rbrt.fr/glean/internal/metrics"
99)
10101111+// Cron periodically runs all cluster engine computations (similarity, embeddings,
1212+// follow distances, signal profiles, auto-dismiss) on a fixed interval.
1113type Cron struct {
1214 engine *Engine
1315 interval time.Duration
1416 logger *slog.Logger
1517}
16181919+// NewCron creates a new cron runner with the given engine and interval.
1720func NewCron(engine *Engine, interval time.Duration, logger *slog.Logger) *Cron {
1821 return &Cron{engine: engine, interval: interval, logger: logger}
1922}
20232424+// Run starts the cron loop. It blocks until ctx is cancelled. Each tick runs
2525+// all computations sequentially; if a previous run is still in progress the
2626+// tick is skipped.
2127func (c *Cron) Run(ctx context.Context) error {
2228 for {
2329 c.logger.Info("starting similarity computation")
···2632 if !c.engine.mu.TryLock() {
2733 c.logger.Info("skipping computation: already in progress")
2834 } else {
3535+ if err := c.engine.ComputeFeedEmbeddings(ctx); err != nil {
3636+ c.engine.logger.Error("feed embeddings failed", "error", err)
3737+ }
2938 if err := c.engine.ComputeFeedSimilarity(ctx); err != nil {
3039 c.engine.logger.Error("feed similarity failed", "error", err)
3140 }
3241 if err := c.engine.ComputeUserSimilarity(ctx); err != nil {
3342 c.engine.logger.Error("user similarity failed", "error", err)
3443 }
3535- if err := c.engine.ComputeFollowDistances(ctx); err != nil {
3636- c.engine.logger.Error("follow distances failed", "error", err)
4444+ if err := c.engine.ComputeArticleEmbeddings(ctx); err != nil {
4545+ c.engine.logger.Error("article embeddings failed", "error", err)
3746 }
3847 if err := c.engine.ComputeSignalProfiles(ctx); err != nil {
3948 c.engine.logger.Error("signal profiles failed", "error", err)
4049 }
4141- if err := c.engine.AutoDismissStale(ctx, 15, 30); err != nil {
5050+ if err := c.engine.ComputeFollowDistances(ctx); err != nil {
5151+ c.logger.Error("follow distances failed", "error", err)
5252+ }
5353+ if err := c.engine.AutoDismissStale(ctx, 5, 5); err != nil {
4254 c.engine.logger.Error("auto dismiss failed", "error", err)
4355 }
4456 c.engine.mu.Unlock()
+11-11
internal/cluster/dismiss.go
···55 "time"
66)
7788+// Impression records that a recommendation was shown to a user.
89type Impression struct {
910 TargetType string
1011 TargetID string
1112}
12131314func (e *Engine) DismissFeed(ctx context.Context, userDID, feedURL, reason string) error {
1414- return nil
1515 _, err := e.db.ExecContext(ctx, `
1616- INSERT INTO recs.dismissed_recommendations (user_did, target_type, target_id, reason)
1616+ INSERT INTO main.dismissed_recommendations (user_did, target_type, target_id, reason)
1717 VALUES (?, 'feed', ?, ?)
1818 ON CONFLICT(user_did, target_type, target_id) DO UPDATE SET reason = excluded.reason, dismissed_at = CURRENT_TIMESTAMP
1919 `, userDID, feedURL, reason)
···2121}
22222323func (e *Engine) DismissArticle(ctx context.Context, userDID, articleURL, reason string) error {
2424- return nil
2524 _, err := e.db.ExecContext(ctx, `
2626- INSERT INTO recs.dismissed_recommendations (user_did, target_type, target_id, reason)
2525+ INSERT INTO main.dismissed_recommendations (user_did, target_type, target_id, reason)
2726 VALUES (?, 'article', ?, ?)
2827 ON CONFLICT(user_did, target_type, target_id) DO UPDATE SET reason = excluded.reason, dismissed_at = CURRENT_TIMESTAMP
2928 `, userDID, articleURL, reason)
···3130}
32313332func (e *Engine) RecordImpressions(ctx context.Context, userDID string, impressions []Impression) error {
3434- return nil
3533 tx, err := e.db.BeginTx(ctx, nil)
3634 if err != nil {
3735 return err
···40384139 for _, imp := range impressions {
4240 _, err := tx.ExecContext(ctx, `
4343- INSERT INTO recs.recommendation_impressions (user_did, target_type, target_id, first_shown_at, last_shown_at, shown_count)
4141+ INSERT INTO main.recommendation_impressions (user_did, target_type, target_id, first_shown_at, last_shown_at, shown_count)
4442 VALUES (?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1)
4543 ON CONFLICT(user_did, target_type, target_id) DO UPDATE SET
4644 last_shown_at = CURRENT_TIMESTAMP,
···5452}
55535654func (e *Engine) MarkImpressionActed(ctx context.Context, userDID, targetType, targetID string) error {
5757- return nil
5855 _, err := e.db.ExecContext(ctx, `
5959- UPDATE recs.recommendation_impressions SET acted = 1
5656+ UPDATE main.recommendation_impressions SET acted = 1
6057 WHERE user_did = ? AND target_type = ? AND target_id = ?
6158 `, userDID, targetType, targetID)
6259 return err
6360}
64616262+// AutoDismissStale marks recommendations as dismissed if they were shown at
6363+// least minShownCount times over more than maxAgeDays without the user acting
6464+// on them.
6565func (e *Engine) AutoDismissStale(ctx context.Context, minShownCount int, maxAgeDays int) error {
6666 cutoff := time.Now().AddDate(0, 0, -maxAgeDays).Format(time.RFC3339)
67676868 _, err := e.db.ExecContext(ctx, `
6969- INSERT OR IGNORE INTO recs.dismissed_recommendations (user_did, target_type, target_id, reason, dismissed_at)
6969+ INSERT OR IGNORE INTO main.dismissed_recommendations (user_did, target_type, target_id, reason, dismissed_at)
7070 SELECT user_did, target_type, target_id, 'auto_stale', CURRENT_TIMESTAMP
7171- FROM recs.recommendation_impressions
7171+ FROM main.recommendation_impressions
7272 WHERE acted = 0
7373 AND shown_count >= ?
7474 AND first_shown_at < ?
···7979func (e *Engine) IsFeedDismissed(ctx context.Context, userDID, feedURL string) (bool, error) {
8080 var count int
8181 err := e.db.QueryRowContext(ctx, `
8282- SELECT COUNT(1) FROM recs.dismissed_recommendations
8282+ SELECT COUNT(1) FROM main.dismissed_recommendations
8383 WHERE user_did = ? AND target_type = 'feed' AND target_id = ?
8484 `, userDID, feedURL).Scan(&count)
8585 return count > 0, err
+3
internal/cluster/diversity.go
···88const maxPerDomain = 2
99const maxPerCategory = 3
10101111+// ApplyDiversity filters candidates to limit how many feeds come from the same
1212+// domain (max 2) or category (max 3). Candidates are assumed to be sorted by
1313+// score descending.
1114func ApplyDiversity(candidates []*FeedRecommendation, topN int) []*FeedRecommendation {
1215 domainCount := make(map[string]int, len(candidates))
1316 categoryCount := make(map[string]int)
+80
internal/cluster/embed.go
···11+package cluster
22+33+import (
44+ "bytes"
55+ "context"
66+ "encoding/binary"
77+88+ "github.com/openai/openai-go"
99+ "github.com/openai/openai-go/option"
1010+)
1111+1212+// Embedder generates vector embeddings for text inputs. Implementations must be
1313+// safe for concurrent use.
1414+type Embedder interface {
1515+ Embed(ctx context.Context, texts []string) ([][]float32, error)
1616+ Dimension() int
1717+}
1818+1919+type OpenAIEmbedder struct {
2020+ client openai.Client
2121+ model string
2222+ dimension int
2323+}
2424+2525+type OpenAIEmbedderConfig struct {
2626+ BaseURL string
2727+ APIKey string
2828+ Model string
2929+ Dimension int
3030+}
3131+3232+func NewOpenAIEmbedder(cfg OpenAIEmbedderConfig) *OpenAIEmbedder {
3333+ opts := []option.RequestOption{}
3434+ if cfg.BaseURL != "" {
3535+ opts = append(opts, option.WithBaseURL(cfg.BaseURL))
3636+ }
3737+ if cfg.APIKey != "" {
3838+ opts = append(opts, option.WithAPIKey(cfg.APIKey))
3939+ }
4040+ return &OpenAIEmbedder{
4141+ client: openai.NewClient(opts...),
4242+ model: cfg.Model,
4343+ dimension: cfg.Dimension,
4444+ }
4545+}
4646+4747+func (e *OpenAIEmbedder) Dimension() int {
4848+ return e.dimension
4949+}
5050+5151+func (e *OpenAIEmbedder) Embed(ctx context.Context, texts []string) ([][]float32, error) {
5252+ resp, err := e.client.Embeddings.New(ctx, openai.EmbeddingNewParams{
5353+ Model: e.model,
5454+ Input: openai.EmbeddingNewParamsInputUnion{
5555+ OfArrayOfStrings: texts,
5656+ },
5757+ })
5858+ if err != nil {
5959+ return nil, err
6060+ }
6161+6262+ embeddings := make([][]float32, len(resp.Data))
6363+ for i, d := range resp.Data {
6464+ embeddings[i] = make([]float32, len(d.Embedding))
6565+ for j, v := range d.Embedding {
6666+ embeddings[i][j] = float32(v)
6767+ }
6868+ }
6969+ return embeddings, nil
7070+}
7171+7272+func deserializeFloat32(data []byte) []float32 {
7373+ if len(data)%4 != 0 {
7474+ return nil
7575+ }
7676+ result := make([]float32, len(data)/4)
7777+ r := bytes.NewReader(data)
7878+ _ = binary.Read(r, binary.LittleEndian, &result)
7979+ return result
8080+}
+364-284
internal/cluster/jaccard.go
···55 "database/sql"
66 "fmt"
77 "log/slog"
88+ "math"
89 "sync"
910)
10111212+// Config controls weights used during similarity computation (feed similarity
1313+// embedding boost, user similarity like/tag/follow contributions). These are
1414+// distinct from SignalWeights which control on-demand scoring multipliers.
1115type Config struct {
1216 FollowBoost float64
1317 LikesWeight float64
···1519 DescriptionWeight float64
1620}
17212222+// DefaultConfig returns the default similarity computation weights.
1823func DefaultConfig() Config {
1924 return Config{
2025 FollowBoost: 0.5,
···2429 }
2530}
26312727-// Engine uses *sql.DB directly because it performs cross-schema transactions
2828-// across main, articles, and recs. Typed stores would add overhead without benefit here.
3232+// Engine is the recommendation engine. It holds a reference to the SQLite
3333+// database, an optional embedder for content-based signals, and configuration.
3434+// All public methods are safe for concurrent use (cron writes, on-demand reads).
2935type Engine struct {
3030- db *sql.DB
3131- logger *slog.Logger
3232- mu sync.Mutex
3333- config Config
3636+ db *sql.DB
3737+ logger *slog.Logger
3838+ mu sync.Mutex
3939+ config Config
4040+ embedder Embedder
3441}
35423636-func NewEngine(db *sql.DB, logger *slog.Logger) *Engine {
3737- return &Engine{db: db, logger: logger, config: DefaultConfig()}
4343+// NewEngine creates a new recommendation engine. Pass nil for embedder to
4444+// disable content-based signals (no embedding computation, no KNN queries).
4545+func NewEngine(db *sql.DB, embedder Embedder, logger *slog.Logger) *Engine {
4646+ return &Engine{db: db, logger: logger, config: DefaultConfig(), embedder: embedder}
3847}
39484949+// ComputeFeedSimilarity recomputes the feed_similarity table: time-decayed
5050+// subscriber Jaccard for all feed pairs with shared subscribers, plus an
5151+// embedding cosine similarity boost for pairs that both have embeddings.
4052func (e *Engine) ComputeFeedSimilarity(ctx context.Context) error {
4141- tx, err := e.db.BeginTx(ctx, nil)
5353+ conn, err := e.db.Conn(ctx)
4254 if err != nil {
4355 return err
4456 }
4545- defer func() { _ = tx.Rollback() }()
5757+ defer conn.Close()
46584747- if _, err := tx.ExecContext(ctx, `DELETE FROM recs.feed_similarity`); err != nil {
4848- return err
4949- }
5959+ {
6060+ tx, err := conn.BeginTx(ctx, nil)
6161+ if err != nil {
6262+ return err
6363+ }
6464+ defer func() { _ = tx.Rollback() }()
50655151- _, err = tx.ExecContext(ctx, `
5252- INSERT INTO recs.feed_similarity (feed_a, feed_b, jaccard)
5353- SELECT
5454- s1.feed_url,
5555- s2.feed_url,
5656- CAST(COUNT(*) AS REAL) / (f1.subscriber_count + f2.subscriber_count - CAST(COUNT(*) AS REAL))
5757- FROM articles.subscriptions s1
5858- JOIN articles.subscriptions s2 ON s1.user_did = s2.user_did AND s1.feed_url < s2.feed_url
5959- JOIN articles.feeds f1 ON f1.feed_url = s1.feed_url
6060- JOIN articles.feeds f2 ON f2.feed_url = s2.feed_url
6161- GROUP BY s1.feed_url, s2.feed_url
6262- `)
6363- if err != nil {
6464- return err
6565- }
6666+ if _, err := tx.ExecContext(ctx, `CREATE TEMP TABLE IF NOT EXISTS _feed_sim_staging (
6767+ feed_a TEXT NOT NULL, feed_b TEXT NOT NULL, jaccard REAL NOT NULL,
6868+ PRIMARY KEY (feed_a, feed_b))`); err != nil {
6969+ return err
7070+ }
7171+ if _, err := tx.ExecContext(ctx, `DELETE FROM _feed_sim_staging`); err != nil {
7272+ return err
7373+ }
66746767- if err := e.computeDescriptionSimilarity(ctx, tx); err != nil {
6868- e.logger.Warn("description similarity failed", "error", err)
7575+ _, err = tx.ExecContext(ctx, `
7676+ INSERT INTO _feed_sim_staging (feed_a, feed_b, jaccard)
7777+ SELECT
7878+ s1.feed_url,
7979+ s2.feed_url,
8080+ SUM(EXP(-0.023 * CAST(julianday('now') - julianday(MIN(s1.added_at, s2.added_at)) AS REAL)))
8181+ / (f1.subscriber_count + f2.subscriber_count - CAST(COUNT(*) AS REAL))
8282+ FROM articles.subscriptions s1
8383+ JOIN articles.subscriptions s2 ON s1.user_did = s2.user_did AND s1.feed_url < s2.feed_url
8484+ JOIN articles.feeds f1 ON f1.feed_url = s1.feed_url
8585+ JOIN articles.feeds f2 ON f2.feed_url = s2.feed_url
8686+ WHERE s1.added_at IS NOT NULL AND s2.added_at IS NOT NULL
8787+ GROUP BY s1.feed_url, s2.feed_url
8888+ `)
8989+ if err != nil {
9090+ return err
9191+ }
9292+9393+ if err := e.computeEmbeddingSimilarity(ctx, tx); err != nil {
9494+ e.logger.Warn("embedding similarity failed", "error", err)
9595+ }
9696+9797+ if err := tx.Commit(); err != nil {
9898+ return err
9999+ }
69100 }
701017171- e.logger.Info("feed similarity computed")
7272- return tx.Commit()
7373-}
102102+ {
103103+ tx, err := conn.BeginTx(ctx, nil)
104104+ if err != nil {
105105+ return err
106106+ }
107107+ defer func() { _ = tx.Rollback() }()
108108+109109+ if _, err := tx.ExecContext(ctx, `DELETE FROM recs.feed_similarity`); err != nil {
110110+ return err
111111+ }
112112+ if _, err := tx.ExecContext(ctx, `INSERT INTO recs.feed_similarity (feed_a, feed_b, jaccard) SELECT feed_a, feed_b, jaccard FROM _feed_sim_staging`); err != nil {
113113+ return err
114114+ }
741157575-func (e *Engine) computeDescriptionSimilarity(ctx context.Context, tx *sql.Tx) error {
7676- if _, err := tx.ExecContext(ctx, `CREATE TEMP TABLE IF NOT EXISTS _feed_words (feed_url TEXT, word TEXT)`); err != nil {
7777- return err
116116+ e.logger.Info("feed similarity computed")
117117+ return tx.Commit()
78118 }
7979- if _, err := tx.ExecContext(ctx, `DELETE FROM _feed_words`); err != nil {
8080- return err
119119+}
120120+121121+func (e *Engine) computeEmbeddingSimilarity(ctx context.Context, tx *sql.Tx) error {
122122+ if e.embedder == nil {
123123+ return nil
81124 }
125125+ e.logger.Debug("computing embedding similarity")
821268383- _, err := tx.ExecContext(ctx, `
8484- INSERT INTO _feed_words (feed_url, word)
8585- WITH feed_tokens AS (
8686- SELECT feed_url, LOWER(TRIM(value)) AS word
8787- FROM articles.feeds,
8888- json_each('["' || REPLACE(LOWER(COALESCE(description, '')), ' ', '","') || '"]')
8989- WHERE description IS NOT NULL AND description != ''
9090- )
9191- SELECT feed_url, word FROM feed_tokens
9292- WHERE LENGTH(word) > 3
9393- AND word NOT IN ('about','also','been','being','both','could','every','from','have','here',
9494- 'into','just','like','more','much','must','other','over','some','such','than','that',
9595- 'their','them','then','there','these','they','this','through','very','what','when',
9696- 'where','which','while','will','with','your','most','updated','latest','posts',
9797- 'news','blog','feed','reading','read','articles','article','weekly','daily',
9898- 'monthly','personal','thoughts','views','opinions','writing','write','written')
9999- `)
127127+ stagingRows, err := tx.QueryContext(ctx, `SELECT feed_a, feed_b FROM _feed_sim_staging`)
100128 if err != nil {
101129 return err
102130 }
103131104104- if _, err := tx.ExecContext(ctx, `
105105- CREATE TEMP TABLE IF NOT EXISTS _feed_word_counts (feed_url TEXT PRIMARY KEY, cnt INT)
106106- `); err != nil {
107107- return err
108108- }
109109- if _, err := tx.ExecContext(ctx, `DELETE FROM _feed_word_counts`); err != nil {
110110- return err
111111- }
112112- if _, err := tx.ExecContext(ctx, `
113113- INSERT INTO _feed_word_counts (feed_url, cnt)
114114- SELECT feed_url, COUNT(DISTINCT word) FROM _feed_words GROUP BY feed_url
115115- `); err != nil {
116116- return err
132132+ type pair struct{ a, b string }
133133+ var pairs []pair
134134+ feedSet := make(map[string]bool)
135135+ for stagingRows.Next() {
136136+ var p pair
137137+ if err := stagingRows.Scan(&p.a, &p.b); err != nil {
138138+ stagingRows.Close()
139139+ return err
140140+ }
141141+ pairs = append(pairs, p)
142142+ feedSet[p.a] = true
143143+ feedSet[p.b] = true
117144 }
145145+ stagingRows.Close()
118146119119- if _, err := tx.ExecContext(ctx, `
120120- CREATE TEMP TABLE IF NOT EXISTS _word_overlap (feed_a TEXT, feed_b TEXT, common INT)
121121- `); err != nil {
122122- return err
147147+ if len(feedSet) == 0 {
148148+ return nil
123149 }
124124- if _, err := tx.ExecContext(ctx, `DELETE FROM _word_overlap`); err != nil {
125125- return err
150150+151151+ ph := make([]string, 0, len(feedSet))
152152+ args := make([]any, 0, len(feedSet))
153153+ for url := range feedSet {
154154+ ph = append(ph, "?")
155155+ args = append(args, url)
126156 }
127127- if _, err := tx.ExecContext(ctx, `
128128- INSERT INTO _word_overlap (feed_a, feed_b, common)
129129- SELECT w1.feed_url, w2.feed_url, COUNT(DISTINCT w1.word)
130130- FROM _feed_words w1
131131- JOIN _feed_words w2 ON w1.word = w2.word AND w1.feed_url < w2.feed_url
132132- GROUP BY w1.feed_url, w2.feed_url
133133- HAVING COUNT(DISTINCT w1.word) > 1
134134- `); err != nil {
157157+ embRows, err := tx.QueryContext(ctx,
158158+ fmt.Sprintf("SELECT feed_url, embedding FROM recs.feed_embeddings WHERE feed_url IN (%s)", joinPh(ph)),
159159+ args...,
160160+ )
161161+ if err != nil {
135162 return err
136163 }
137164138138- if _, err := tx.ExecContext(ctx, `
139139- CREATE INDEX IF NOT EXISTS _idx_feed_words_word ON _feed_words(word, feed_url)
140140- `); err != nil {
141141- return err
165165+ embeddings := make(map[string][]float32)
166166+ for embRows.Next() {
167167+ var url string
168168+ var blob []byte
169169+ if err := embRows.Scan(&url, &blob); err != nil {
170170+ embRows.Close()
171171+ return err
172172+ }
173173+ v := deserializeFloat32(blob)
174174+ if len(v) > 0 {
175175+ embeddings[url] = v
176176+ }
142177 }
143143-144144- descInsert := `
145145- INSERT OR IGNORE INTO recs.feed_similarity (feed_a, feed_b, jaccard)
146146- SELECT feed_a, feed_b, 0 FROM _word_overlap
147147- `
148148- if _, err := tx.ExecContext(ctx, descInsert); err != nil {
149149- return err
150150- }
151151-152152- descUpdate := fmt.Sprintf(`
153153- UPDATE recs.feed_similarity SET
154154- jaccard = jaccard + %g * CAST(_word_overlap.common AS REAL) / NULLIF(
155155- (SELECT cnt FROM _feed_word_counts WHERE feed_url = recs.feed_similarity.feed_a) +
156156- (SELECT cnt FROM _feed_word_counts WHERE feed_url = recs.feed_similarity.feed_b) -
157157- CAST(_word_overlap.common AS REAL),
158158- 0
159159- )
160160- FROM _word_overlap
161161- WHERE recs.feed_similarity.feed_a = _word_overlap.feed_a
162162- AND recs.feed_similarity.feed_b = _word_overlap.feed_b
163163- `, e.config.DescriptionWeight)
178178+ embRows.Close()
164179165165- if _, err := tx.ExecContext(ctx, descUpdate); err != nil {
166166- return err
180180+ for _, p := range pairs {
181181+ vecA, okA := embeddings[p.a]
182182+ vecB, okB := embeddings[p.b]
183183+ if !okA || !okB {
184184+ continue
185185+ }
186186+ sim := cosineSimilarity(vecA, vecB)
187187+ if sim <= 0 {
188188+ continue
189189+ }
190190+ boost := sim * e.config.DescriptionWeight
191191+ if _, err := tx.ExecContext(ctx,
192192+ `UPDATE _feed_sim_staging SET jaccard = jaccard + ? WHERE feed_a = ? AND feed_b = ?`,
193193+ boost, p.a, p.b,
194194+ ); err != nil {
195195+ return err
196196+ }
167197 }
168198169199 return nil
170200}
171201172172-func (e *Engine) ComputeUserSimilarity(ctx context.Context) error {
173173- tx, err := e.db.BeginTx(ctx, nil)
174174- if err != nil {
175175- return err
202202+func cosineSimilarity(a, b []float32) float64 {
203203+ var dot, normA, normB float64
204204+ for i := range a {
205205+ dot += float64(a[i]) * float64(b[i])
206206+ normA += float64(a[i]) * float64(a[i])
207207+ normB += float64(b[i]) * float64(b[i])
176208 }
177177- defer func() { _ = tx.Rollback() }()
178178-179179- if _, err := tx.ExecContext(ctx, `DELETE FROM recs.user_similarity`); err != nil {
180180- return err
209209+ if normA == 0 || normB == 0 {
210210+ return 0
181211 }
212212+ return dot / (math.Sqrt(normA) * math.Sqrt(normB))
213213+}
182214183183- _, err = tx.ExecContext(ctx, `
184184- INSERT INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds)
185185- SELECT
186186- s1.user_did,
187187- s2.user_did,
188188- CAST(COUNT(*) AS REAL) / (
189189- (SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = s1.user_did) +
190190- (SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = s2.user_did) -
191191- CAST(COUNT(*) AS REAL)
192192- ),
193193- COUNT(*)
194194- FROM articles.subscriptions s1
195195- JOIN articles.subscriptions s2 ON s1.feed_url = s2.feed_url AND s1.user_did < s2.user_did
196196- GROUP BY s1.user_did, s2.user_did
197197- `)
215215+// ComputeUserSimilarity recomputes the user_similarity table: subscription
216216+// Jaccard + time-decayed like co-occurrence + tag overlap + follow boost.
217217+func (e *Engine) ComputeUserSimilarity(ctx context.Context) error {
218218+ conn, err := e.db.Conn(ctx)
198219 if err != nil {
199220 return err
200221 }
222222+ defer conn.Close()
201223202202- if _, err := tx.ExecContext(ctx, `
203203- CREATE TEMP TABLE IF NOT EXISTS _likes_count (author_did TEXT PRIMARY KEY, cnt INT)
204204- `); err != nil {
205205- return err
206206- }
207207- if _, err := tx.ExecContext(ctx, `DELETE FROM _likes_count`); err != nil {
208208- return err
209209- }
210210- if _, err := tx.ExecContext(ctx, `
211211- INSERT INTO _likes_count (author_did, cnt)
212212- SELECT author_did, COUNT(*) FROM articles.likes GROUP BY author_did
213213- `); err != nil {
214214- return err
215215- }
224224+ {
225225+ tx, err := conn.BeginTx(ctx, nil)
226226+ if err != nil {
227227+ return err
228228+ }
229229+ defer func() { _ = tx.Rollback() }()
216230217217- if _, err := tx.ExecContext(ctx, `
218218- CREATE TEMP TABLE IF NOT EXISTS _likes_overlap (user_a TEXT, user_b TEXT, common INT, PRIMARY KEY(user_a, user_b))
219219- `); err != nil {
220220- return err
221221- }
222222- if _, err := tx.ExecContext(ctx, `DELETE FROM _likes_overlap`); err != nil {
223223- return err
224224- }
225225- if _, err := tx.ExecContext(ctx, `
226226- INSERT INTO _likes_overlap (user_a, user_b, common)
227227- SELECT l1.author_did, l2.author_did,
228228- CAST(SUM(
229229- EXP(-0.023 * CAST(julianday('now') - julianday(l1.created_at) AS REAL))
230230- * EXP(-0.023 * CAST(julianday('now') - julianday(l2.created_at) AS REAL))
231231- ) AS INTEGER)
232232- FROM articles.likes l1
233233- JOIN articles.likes l2 ON l1.feed_url = l2.feed_url AND l1.article_url = l2.article_url
234234- AND l1.author_did < l2.author_did
235235- WHERE l1.created_at IS NOT NULL AND l2.created_at IS NOT NULL
236236- GROUP BY l1.author_did, l2.author_did
237237- `); err != nil {
238238- return err
239239- }
231231+ if _, err := tx.ExecContext(ctx, `CREATE TEMP TABLE IF NOT EXISTS _user_sim_staging (
232232+ user_a TEXT NOT NULL, user_b TEXT NOT NULL, jaccard REAL NOT NULL,
233233+ common_feeds INT NOT NULL DEFAULT 0, common_likes INT NOT NULL DEFAULT 0, common_tags INT NOT NULL DEFAULT 0,
234234+ PRIMARY KEY (user_a, user_b))`); err != nil {
235235+ return err
236236+ }
237237+ if _, err := tx.ExecContext(ctx, `DELETE FROM _user_sim_staging`); err != nil {
238238+ return err
239239+ }
240240241241- likesUpdate := fmt.Sprintf(`
242242- UPDATE recs.user_similarity SET
243243- jaccard = jaccard + %g * CAST(_likes_overlap.common AS REAL) / NULLIF(
244244- (SELECT cnt FROM _likes_count WHERE author_did = recs.user_similarity.user_a) +
245245- (SELECT cnt FROM _likes_count WHERE author_did = recs.user_similarity.user_b) -
246246- CAST(_likes_overlap.common AS REAL),
241241+ _, err = tx.ExecContext(ctx, `
242242+ INSERT INTO _user_sim_staging (user_a, user_b, jaccard, common_feeds, common_likes, common_tags)
243243+ SELECT
244244+ s1.user_did,
245245+ s2.user_did,
246246+ CAST(COUNT(*) AS REAL) / (
247247+ (SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = s1.user_did) +
248248+ (SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = s2.user_did) -
249249+ CAST(COUNT(*) AS REAL)
250250+ ),
251251+ COUNT(*),
252252+ 0,
247253 0
248248- ),
249249- common_likes = _likes_overlap.common
250250- FROM _likes_overlap
251251- WHERE recs.user_similarity.user_a = _likes_overlap.user_a
252252- AND recs.user_similarity.user_b = _likes_overlap.user_b
253253- `, e.config.LikesWeight)
254254+ FROM articles.subscriptions s1
255255+ JOIN articles.subscriptions s2 ON s1.feed_url = s2.feed_url AND s1.user_did < s2.user_did
256256+ GROUP BY s1.user_did, s2.user_did
257257+ `)
258258+ if err != nil {
259259+ return err
260260+ }
254261255255- if _, err := tx.ExecContext(ctx, likesUpdate); err != nil {
256256- return err
257257- }
262262+ if _, err := tx.ExecContext(ctx, `
263263+ CREATE TEMP TABLE IF NOT EXISTS _likes_count (author_did TEXT PRIMARY KEY, cnt INT)
264264+ `); err != nil {
265265+ return err
266266+ }
267267+ if _, err := tx.ExecContext(ctx, `DELETE FROM _likes_count`); err != nil {
268268+ return err
269269+ }
270270+ if _, err := tx.ExecContext(ctx, `
271271+ INSERT INTO _likes_count (author_did, cnt)
272272+ SELECT author_did, COUNT(*) FROM articles.likes GROUP BY author_did
273273+ `); err != nil {
274274+ return err
275275+ }
258276259259- likesInsert := fmt.Sprintf(`
260260- INSERT INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds, common_likes)
261261- SELECT sub.user_a, sub.user_b, sub.jaccard, 0, sub.common
262262- FROM (
263263- SELECT
264264- lo.user_a,
265265- lo.user_b,
266266- %g * CAST(lo.common AS REAL) / NULLIF(
267267- (SELECT cnt FROM _likes_count WHERE author_did = lo.user_a) +
268268- (SELECT cnt FROM _likes_count WHERE author_did = lo.user_b) -
269269- CAST(lo.common AS REAL),
277277+ if _, err := tx.ExecContext(ctx, `
278278+ CREATE TEMP TABLE IF NOT EXISTS _likes_overlap (user_a TEXT, user_b TEXT, common INT, PRIMARY KEY(user_a, user_b))
279279+ `); err != nil {
280280+ return err
281281+ }
282282+ if _, err := tx.ExecContext(ctx, `DELETE FROM _likes_overlap`); err != nil {
283283+ return err
284284+ }
285285+ if _, err := tx.ExecContext(ctx, `
286286+ INSERT INTO _likes_overlap (user_a, user_b, common)
287287+ SELECT l1.author_did, l2.author_did,
288288+ CAST(SUM(
289289+ EXP(-0.023 * CAST(julianday('now') - julianday(l1.created_at) AS REAL))
290290+ * EXP(-0.023 * CAST(julianday('now') - julianday(l2.created_at) AS REAL))
291291+ ) AS INTEGER)
292292+ FROM articles.likes l1
293293+ JOIN articles.likes l2 ON l1.feed_url = l2.feed_url AND l1.article_url = l2.article_url
294294+ AND l1.author_did < l2.author_did
295295+ WHERE l1.created_at IS NOT NULL AND l2.created_at IS NOT NULL
296296+ GROUP BY l1.author_did, l2.author_did
297297+ `); err != nil {
298298+ return err
299299+ }
300300+301301+ likesUpdate := fmt.Sprintf(`
302302+ UPDATE _user_sim_staging SET
303303+ jaccard = jaccard + %g * CAST(_likes_overlap.common AS REAL) / NULLIF(
304304+ (SELECT cnt FROM _likes_count WHERE author_did = _user_sim_staging.user_a) +
305305+ (SELECT cnt FROM _likes_count WHERE author_did = _user_sim_staging.user_b) -
306306+ CAST(_likes_overlap.common AS REAL),
270307 0
271271- ) AS jaccard,
272272- lo.common
273273- FROM _likes_overlap lo
274274- ) sub WHERE 1
275275- ON CONFLICT(user_a, user_b) DO UPDATE SET
276276- jaccard = jaccard + excluded.jaccard,
277277- common_likes = excluded.common_likes
278278- `, e.config.LikesWeight)
308308+ ),
309309+ common_likes = _likes_overlap.common
310310+ FROM _likes_overlap
311311+ WHERE _user_sim_staging.user_a = _likes_overlap.user_a
312312+ AND _user_sim_staging.user_b = _likes_overlap.user_b
313313+ `, e.config.LikesWeight)
279314280280- if _, err := tx.ExecContext(ctx, likesInsert); err != nil {
281281- return err
282282- }
315315+ if _, err := tx.ExecContext(ctx, likesUpdate); err != nil {
316316+ return err
317317+ }
283318284284- if _, err := tx.ExecContext(ctx, `CREATE TEMP TABLE IF NOT EXISTS _tag_overlap (user_a TEXT, user_b TEXT, common INT)`); err != nil {
285285- return err
286286- }
287287- if _, err := tx.ExecContext(ctx, `DELETE FROM _tag_overlap`); err != nil {
288288- return err
289289- }
319319+ likesInsert := fmt.Sprintf(`
320320+ INSERT INTO _user_sim_staging (user_a, user_b, jaccard, common_feeds, common_likes)
321321+ SELECT sub.user_a, sub.user_b, sub.jaccard, 0, sub.common
322322+ FROM (
323323+ SELECT
324324+ lo.user_a,
325325+ lo.user_b,
326326+ %g * CAST(lo.common AS REAL) / NULLIF(
327327+ (SELECT cnt FROM _likes_count WHERE author_did = lo.user_a) +
328328+ (SELECT cnt FROM _likes_count WHERE author_did = lo.user_b) -
329329+ CAST(lo.common AS REAL),
330330+ 0
331331+ ) AS jaccard,
332332+ lo.common
333333+ FROM _likes_overlap lo
334334+ ) sub WHERE 1
335335+ ON CONFLICT(user_a, user_b) DO UPDATE SET
336336+ jaccard = jaccard + excluded.jaccard,
337337+ common_likes = excluded.common_likes
338338+ `, e.config.LikesWeight)
290339291291- _, err = tx.ExecContext(ctx, `
292292- INSERT INTO _tag_overlap (user_a, user_b, common)
293293- WITH user_tags AS (
294294- SELECT author_did, TRIM(value) AS tag FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]')
295295- WHERE tags IS NOT NULL AND tags != ''
296296- )
297297- SELECT t1.author_did, t2.author_did, COUNT(DISTINCT t1.tag)
298298- FROM user_tags t1
299299- JOIN user_tags t2 ON t1.tag = t2.tag AND t1.author_did < t2.author_did
300300- GROUP BY t1.author_did, t2.author_did
301301- `)
302302- if err != nil {
303303- return err
304304- }
340340+ if _, err := tx.ExecContext(ctx, likesInsert); err != nil {
341341+ return err
342342+ }
305343306306- if _, err := tx.ExecContext(ctx, `
307307- CREATE TEMP TABLE IF NOT EXISTS _tag_count (author_did TEXT PRIMARY KEY, cnt INT)
308308- `); err != nil {
309309- return err
310310- }
311311- if _, err := tx.ExecContext(ctx, `DELETE FROM _tag_count`); err != nil {
312312- return err
313313- }
314314- if _, err := tx.ExecContext(ctx, `
315315- INSERT INTO _tag_count (author_did, cnt)
316316- WITH user_tags AS (
317317- SELECT author_did, TRIM(value) AS tag FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]')
318318- WHERE tags IS NOT NULL AND tags != ''
319319- )
320320- SELECT author_did, COUNT(DISTINCT tag) FROM user_tags GROUP BY author_did
321321- `); err != nil {
322322- return err
323323- }
344344+ if _, err := tx.ExecContext(ctx, `CREATE TEMP TABLE IF NOT EXISTS _tag_overlap (user_a TEXT, user_b TEXT, common INT)`); err != nil {
345345+ return err
346346+ }
347347+ if _, err := tx.ExecContext(ctx, `DELETE FROM _tag_overlap`); err != nil {
348348+ return err
349349+ }
324350325325- _, err = tx.ExecContext(ctx, `
326326- INSERT OR IGNORE INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds, common_tags)
327327- SELECT user_a, user_b, 0, 0, 0 FROM _tag_overlap
328328- `)
329329- if err != nil {
330330- return err
331331- }
351351+ _, err = tx.ExecContext(ctx, `
352352+ INSERT INTO _tag_overlap (user_a, user_b, common)
353353+ WITH user_tags AS (
354354+ SELECT author_did, TRIM(value) AS tag FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]')
355355+ WHERE tags IS NOT NULL AND tags != ''
356356+ )
357357+ SELECT t1.author_did, t2.author_did, COUNT(DISTINCT t1.tag)
358358+ FROM user_tags t1
359359+ JOIN user_tags t2 ON t1.tag = t2.tag AND t1.author_did < t2.author_did
360360+ GROUP BY t1.author_did, t2.author_did
361361+ `)
362362+ if err != nil {
363363+ return err
364364+ }
332365333333- tagsUpdate := fmt.Sprintf(`
334334- UPDATE recs.user_similarity SET
335335- jaccard = jaccard + %g * CAST(_tag_overlap.common AS REAL) / NULLIF(
336336- (SELECT cnt FROM _tag_count WHERE author_did = recs.user_similarity.user_a) +
337337- (SELECT cnt FROM _tag_count WHERE author_did = recs.user_similarity.user_b) -
338338- CAST(_tag_overlap.common AS REAL),
339339- 0
340340- ),
341341- common_tags = _tag_overlap.common
342342- FROM _tag_overlap
343343- WHERE recs.user_similarity.user_a = _tag_overlap.user_a
344344- AND recs.user_similarity.user_b = _tag_overlap.user_b
345345- `, e.config.TagsWeight)
366366+ if _, err := tx.ExecContext(ctx, `
367367+ CREATE TEMP TABLE IF NOT EXISTS _tag_count (author_did TEXT PRIMARY KEY, cnt INT)
368368+ `); err != nil {
369369+ return err
370370+ }
371371+ if _, err := tx.ExecContext(ctx, `DELETE FROM _tag_count`); err != nil {
372372+ return err
373373+ }
374374+ if _, err := tx.ExecContext(ctx, `
375375+ INSERT INTO _tag_count (author_did, cnt)
376376+ WITH user_tags AS (
377377+ SELECT author_did, TRIM(value) AS tag FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]')
378378+ WHERE tags IS NOT NULL AND tags != ''
379379+ )
380380+ SELECT author_did, COUNT(DISTINCT tag) FROM user_tags GROUP BY author_did
381381+ `); err != nil {
382382+ return err
383383+ }
346384347347- if _, err := tx.ExecContext(ctx, tagsUpdate); err != nil {
348348- return err
385385+ _, err = tx.ExecContext(ctx, `
386386+ INSERT OR IGNORE INTO _user_sim_staging (user_a, user_b, jaccard, common_feeds, common_tags)
387387+ SELECT user_a, user_b, 0, 0, 0 FROM _tag_overlap
388388+ `)
389389+ if err != nil {
390390+ return err
391391+ }
392392+393393+ tagsUpdate := fmt.Sprintf(`
394394+ UPDATE _user_sim_staging SET
395395+ jaccard = jaccard + %g * CAST(_tag_overlap.common AS REAL) / NULLIF(
396396+ (SELECT cnt FROM _tag_count WHERE author_did = _user_sim_staging.user_a) +
397397+ (SELECT cnt FROM _tag_count WHERE author_did = _user_sim_staging.user_b) -
398398+ CAST(_tag_overlap.common AS REAL),
399399+ 0
400400+ ),
401401+ common_tags = _tag_overlap.common
402402+ FROM _tag_overlap
403403+ WHERE _user_sim_staging.user_a = _tag_overlap.user_a
404404+ AND _user_sim_staging.user_b = _tag_overlap.user_b
405405+ `, e.config.TagsWeight)
406406+407407+ if _, err := tx.ExecContext(ctx, tagsUpdate); err != nil {
408408+ return err
409409+ }
410410+411411+ followQuery := fmt.Sprintf(`
412412+ INSERT INTO _user_sim_staging (user_a, user_b, jaccard, common_feeds, common_likes, common_tags)
413413+ SELECT
414414+ MIN(f.user_did, f.target_did),
415415+ MAX(f.user_did, f.target_did),
416416+ %g,
417417+ 0, 0, 0
418418+ FROM main.follows f
419419+ WHERE f.user_did != f.target_did
420420+ GROUP BY MIN(f.user_did, f.target_did), MAX(f.user_did, f.target_did)
421421+ ON CONFLICT(user_a, user_b) DO UPDATE SET
422422+ jaccard = jaccard + %g
423423+ `, e.config.FollowBoost, e.config.FollowBoost)
424424+425425+ if _, err := tx.ExecContext(ctx, followQuery); err != nil {
426426+ return err
427427+ }
428428+429429+ if err := tx.Commit(); err != nil {
430430+ return err
431431+ }
349432 }
350433351351- followQuery := fmt.Sprintf(`
352352- INSERT INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds, common_likes, common_tags)
353353- SELECT
354354- MIN(f.user_did, f.target_did),
355355- MAX(f.user_did, f.target_did),
356356- %g,
357357- 0, 0, 0
358358- FROM main.follows f
359359- WHERE f.user_did != f.target_did
360360- GROUP BY MIN(f.user_did, f.target_did), MAX(f.user_did, f.target_did)
361361- ON CONFLICT(user_a, user_b) DO UPDATE SET
362362- jaccard = jaccard + %g
363363- `, e.config.FollowBoost, e.config.FollowBoost)
434434+ {
435435+ tx, err := conn.BeginTx(ctx, nil)
436436+ if err != nil {
437437+ return err
438438+ }
439439+ defer func() { _ = tx.Rollback() }()
364440365365- if _, err := tx.ExecContext(ctx, followQuery); err != nil {
366366- return err
367367- }
441441+ if _, err := tx.ExecContext(ctx, `DELETE FROM recs.user_similarity`); err != nil {
442442+ return err
443443+ }
444444+ if _, err := tx.ExecContext(ctx, `INSERT INTO recs.user_similarity (user_a, user_b, jaccard, common_feeds, common_likes, common_tags) SELECT user_a, user_b, jaccard, common_feeds, common_likes, common_tags FROM _user_sim_staging`); err != nil {
445445+ return err
446446+ }
368447369369- e.logger.Info("user similarity computed")
370370- return tx.Commit()
448448+ e.logger.Info("user similarity computed")
449449+ return tx.Commit()
450450+ }
371451}
···33import (
44 "context"
55 "database/sql"
66+ "fmt"
77+88+ vec "github.com/asg017/sqlite-vec-go-bindings/cgo"
69)
710811type FeedRecommendation struct {
···4043 Score float64
4144}
42454646+// GetFeedRecommendations returns feed recommendations for a user. Users with
4747+// fewer than 5 subscriptions get cold-start recommendations (embedding-based
4848+// KNN or graph+popular fallback). Results are min-max normalized and
4949+// diversity-filtered before returning.
4350func (e *Engine) GetFeedRecommendations(ctx context.Context, userDID string, limit int) ([]*FeedRecommendation, error) {
4451 subCount := 0
4552 _ = e.db.QueryRowContext(ctx, `SELECT COUNT(*) FROM articles.subscriptions WHERE user_did = ?`, userDID).Scan(&subCount)
···4754 if subCount < 5 {
4855 recs, err := e.ColdStartRecommendations(ctx, userDID, limit*2)
4956 if err == nil && len(recs) > 0 {
5757+ normalizeFeedScores(recs)
5058 return ApplyDiversity(recs, limit), nil
5159 }
5260 }
···5664 return nil, err
5765 }
58666767+ normalizeFeedScores(recs)
5968 return ApplyDiversity(recs, limit), nil
6069}
61707171+// GetPeopleRecommendations returns similar users based on subscription overlap,
7272+// like co-occurrence, tag overlap, and follow relationships. Scores are min-max
7373+// normalized on the Jaccard field.
6274func (e *Engine) GetPeopleRecommendations(ctx context.Context, userDID string, limit int) ([]*PersonRecommendation, error) {
6363- return e.ComputePeopleRecommendationsOnDemand(ctx, userDID, limit)
7575+ recs, err := e.ComputePeopleRecommendationsOnDemand(ctx, userDID, limit)
7676+ if err != nil {
7777+ return nil, err
7878+ }
7979+ normalizePersonScores(recs)
8080+ return recs, nil
6481}
65828383+// GetArticleRecommendations returns article recommendations combining social
8484+// signals (liked by similar users, followed users' feeds), content similarity
8585+// (embedding KNN against user's liked articles), and recency. Scores are
8686+// min-max normalized.
6687func (e *Engine) GetArticleRecommendations(ctx context.Context, userDID string, limit int) ([]*ArticleRecommendation, error) {
6767- return e.ComputeArticleRecommendationsOnDemand(ctx, userDID, limit)
8888+ recs, err := e.ComputeArticleRecommendationsOnDemand(ctx, userDID, limit)
8989+ if err != nil {
9090+ return nil, err
9191+ }
9292+ normalizeArticleScores(recs)
9393+ return recs, nil
6894}
69959696+// SignalWeights holds per-signal multipliers used in the recommendation scoring
9797+// formula. Weights are auto-tuned per user via a bandit-style reward/penalty
9898+// system (see weights.go). Each field maps to a column in
9999+// recs.user_signal_weights.
70100type SignalWeights struct {
71101 WSub float64
72102 WLike float64
···74104 WSocial float64
75105 WPop float64
76106 WCategory float64
107107+ WContent float64
77108}
7810979110func defaultWeights() SignalWeights {
···84115 WSocial: 0.7,
85116 WPop: 0.2,
86117 WCategory: 0.4,
118118+ WContent: 0.4,
87119 }
88120}
89121···91123 w := defaultWeights()
92124 var dbW SignalWeights
93125 err := e.db.QueryRowContext(ctx, `
9494- SELECT w_sub, w_like, w_tag, w_social, w_pop, w_category
126126+ SELECT w_sub, w_like, w_tag, w_social, w_pop, w_category, w_content
95127 FROM recs.user_signal_weights WHERE user_did = ?
9696- `, userDID).Scan(&dbW.WSub, &dbW.WLike, &dbW.WTag, &dbW.WSocial, &dbW.WPop, &dbW.WCategory)
128128+ `, userDID).Scan(&dbW.WSub, &dbW.WLike, &dbW.WTag, &dbW.WSocial, &dbW.WPop, &dbW.WCategory, &dbW.WContent)
97129 if err == nil {
98130 return dbW
99131 }
···115147 FROM similar_users su
116148 JOIN articles.subscriptions s ON s.user_did = su.peer
117149 WHERE s.feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?)
118118- AND s.feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
150150+ AND s.feed_url NOT IN (SELECT target_id FROM main.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
119151 GROUP BY s.feed_url
120152 ),
121153 like_signals AS (
···125157 JOIN articles.likes l ON l.author_did = su.peer
126158 JOIN articles.subscriptions s ON s.feed_url = l.feed_url
127159 WHERE s.feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?)
128128- AND s.feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
160160+ AND s.feed_url NOT IN (SELECT target_id FROM main.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
129161 GROUP BY s.feed_url
130162 ),
131163 social_boost AS (
132164 SELECT s.feed_url,
133133- SUM(CASE WHEN fd.distance = 1 THEN 1.0 ELSE 0.3 END) AS social
165165+ SUM(CASE fd.distance WHEN 1 THEN 1.0 WHEN 2 THEN 0.3 WHEN 3 THEN 0.1 ELSE 0 END) AS social
134166 FROM recs.follow_distances fd
135167 JOIN articles.subscriptions s ON s.user_did = fd.user_b
136168 WHERE fd.user_a = ?
137169 AND s.feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?)
138138- AND s.feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
170170+ AND s.feed_url NOT IN (SELECT target_id FROM main.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
139171 GROUP BY s.feed_url
140172 ),
141173 category_counts AS (
···183215 return results, rows.Err()
184216}
185217218218+func normalizeFeedScores(recs []*FeedRecommendation) {
219219+ if len(recs) < 2 {
220220+ return
221221+ }
222222+ min, max := recs[0].Score, recs[0].Score
223223+ for _, r := range recs[1:] {
224224+ if r.Score < min {
225225+ min = r.Score
226226+ }
227227+ if r.Score > max {
228228+ max = r.Score
229229+ }
230230+ }
231231+ if max == min {
232232+ return
233233+ }
234234+ span := max - min
235235+ for _, r := range recs {
236236+ r.Score = (r.Score - min) / span
237237+ }
238238+}
239239+240240+func normalizeArticleScores(recs []*ArticleRecommendation) {
241241+ if len(recs) < 2 {
242242+ return
243243+ }
244244+ min, max := recs[0].Score, recs[0].Score
245245+ for _, r := range recs[1:] {
246246+ if r.Score < min {
247247+ min = r.Score
248248+ }
249249+ if r.Score > max {
250250+ max = r.Score
251251+ }
252252+ }
253253+ if max == min {
254254+ return
255255+ }
256256+ span := max - min
257257+ for _, r := range recs {
258258+ r.Score = (r.Score - min) / span
259259+ }
260260+}
261261+262262+func normalizePersonScores(recs []*PersonRecommendation) {
263263+ if len(recs) < 2 {
264264+ return
265265+ }
266266+ min, max := recs[0].Jaccard, recs[0].Jaccard
267267+ for _, r := range recs[1:] {
268268+ if r.Jaccard < min {
269269+ min = r.Jaccard
270270+ }
271271+ if r.Jaccard > max {
272272+ max = r.Jaccard
273273+ }
274274+ }
275275+ if max == min {
276276+ return
277277+ }
278278+ span := max - min
279279+ for _, r := range recs {
280280+ r.Jaccard = (r.Jaccard - min) / span
281281+ }
282282+}
283283+284284+func (e *Engine) coldStartFromEmbeddings(ctx context.Context, userDID string, limit int) ([]*FeedRecommendation, error) {
285285+ if e.embedder == nil {
286286+ return nil, nil
287287+ }
288288+289289+ conn, err := e.db.Conn(ctx)
290290+ if err != nil {
291291+ return nil, err
292292+ }
293293+ defer conn.Close()
294294+295295+ subRows, err := conn.QueryContext(ctx, `
296296+ SELECT fe.feed_url, fe.embedding FROM articles.subscriptions s
297297+ JOIN recs.feed_embeddings fe ON fe.feed_url = s.feed_url
298298+ WHERE s.user_did = ?
299299+ `, userDID)
300300+ if err != nil {
301301+ return nil, err
302302+ }
303303+304304+ dim := e.embedder.Dimension()
305305+ sumVec := make([]float32, dim)
306306+ subCount := 0
307307+ var subFeedURLs []string
308308+ for subRows.Next() {
309309+ var url string
310310+ var blob []byte
311311+ if err := subRows.Scan(&url, &blob); err != nil {
312312+ subRows.Close()
313313+ return nil, err
314314+ }
315315+ v := deserializeFloat32(blob)
316316+ if len(v) != dim {
317317+ continue
318318+ }
319319+ for j := range sumVec {
320320+ sumVec[j] += v[j]
321321+ }
322322+ subCount++
323323+ subFeedURLs = append(subFeedURLs, url)
324324+ }
325325+ subRows.Close()
326326+327327+ if subCount == 0 {
328328+ return nil, nil
329329+ }
330330+331331+ avgVec := make([]float32, dim)
332332+ for j := range avgVec {
333333+ avgVec[j] = sumVec[j] / float32(subCount)
334334+ }
335335+336336+ subSet := make(map[string]bool, len(subFeedURLs))
337337+ for _, u := range subFeedURLs {
338338+ subSet[u] = true
339339+ }
340340+341341+ queryBlob, err := vec.SerializeFloat32(avgVec)
342342+ if err != nil {
343343+ return nil, fmt.Errorf("serialize query vector: %w", err)
344344+ }
345345+346346+ knnRows, err := conn.QueryContext(ctx, `
347347+ SELECT fe.feed_url, fe.distance, COALESCE(f.title, ''), COALESCE(f.site_url, ''),
348348+ COALESCE(f.description, ''), f.subscriber_count, COALESCE(f.favicon_url, '')
349349+ FROM recs.feed_embeddings fe
350350+ JOIN articles.feeds f ON f.feed_url = fe.feed_url
351351+ WHERE fe.embedding MATCH ? AND fe.k = ?
352352+ ORDER BY fe.distance
353353+ `, queryBlob, limit+len(subSet))
354354+ if err != nil {
355355+ return nil, err
356356+ }
357357+358358+ var results []*FeedRecommendation
359359+ for knnRows.Next() {
360360+ var r FeedRecommendation
361361+ var dist float64
362362+ if err := knnRows.Scan(&r.FeedURL, &dist, &r.Title, &r.SiteURL,
363363+ &r.Description, &r.SubscriberCount, &r.FaviconURL); err != nil {
364364+ knnRows.Close()
365365+ return nil, err
366366+ }
367367+ if subSet[r.FeedURL] {
368368+ continue
369369+ }
370370+ r.Score = 1.0 - dist
371371+ if r.Score <= 0 {
372372+ continue
373373+ }
374374+ results = append(results, &r)
375375+ if len(results) >= limit {
376376+ break
377377+ }
378378+ }
379379+ knnRows.Close()
380380+381381+ return results, nil
382382+}
383383+186384func (e *Engine) ComputeArticleRecommendationsOnDemand(ctx context.Context, userDID string, limit int) ([]*ArticleRecommendation, error) {
187385 w := e.GetWeights(ctx, userDID)
188386189189- rows, err := e.db.QueryContext(ctx, `
387387+ conn, err := e.db.Conn(ctx)
388388+ if err != nil {
389389+ return nil, err
390390+ }
391391+ defer conn.Close()
392392+393393+ if err := e.ensureContentBoostTable(ctx, conn); err != nil {
394394+ return nil, err
395395+ }
396396+397397+ if e.embedder != nil {
398398+ if err := e.populateContentBoost(ctx, conn, userDID); err != nil {
399399+ e.logger.Warn("content boost failed", "error", err)
400400+ }
401401+ }
402402+403403+ rows, err := conn.QueryContext(ctx, `
190404 WITH similar_users AS (
191405 SELECT user_b AS peer, jaccard FROM recs.user_similarity WHERE user_a = ? AND jaccard > 0.15
192406 UNION ALL
···201415 SELECT 1 FROM articles.likes ul WHERE ul.author_did = ? AND ul.feed_url = l.feed_url AND ul.article_url = l.article_url
202416 )
203417 AND NOT EXISTS (
204204- SELECT 1 FROM recs.dismissed_recommendations d WHERE d.user_did = ? AND d.target_type = 'article' AND d.target_id = l.article_url
418418+ SELECT 1 FROM main.dismissed_recommendations d WHERE d.user_did = ? AND d.target_type = 'article' AND d.target_id = l.article_url
205419 )
206420 GROUP BY l.feed_url, l.article_url
207421 ),
208422 social_likes AS (
209423 SELECT l.feed_url, l.article_url,
210210- SUM(CASE WHEN fd.distance = 1 THEN 1.0 ELSE 0.3 END) AS social
424424+ SUM(CASE fd.distance WHEN 1 THEN 1.0 WHEN 2 THEN 0.3 WHEN 3 THEN 0.1 ELSE 0 END) AS social
211425 FROM recs.follow_distances fd
212426 JOIN articles.likes l ON l.author_did = fd.user_b
213427 WHERE fd.user_a = ?
···222436 COALESCE(rs.is_read, 0),
223437 COALESCE(la.like_signal, 0) * ?
224438 + COALESCE(sl.social, 0) * ?
439439+ + COALESCE(cb.score, 0) * ?
225440 + EXP(-0.023 * CAST(julianday('now') - julianday(a.published) AS REAL)) * 0.2
226441 AS score
227442 FROM liked_articles la
228443 JOIN articles.articles a ON a.feed_url = la.feed_url AND a.url = la.article_url
229444 LEFT JOIN articles.feeds f ON f.feed_url = la.feed_url
230445 LEFT JOIN social_likes sl ON sl.feed_url = la.feed_url AND sl.article_url = la.article_url
446446+ LEFT JOIN _content_boost cb ON cb.article_id = a.id
231447 LEFT JOIN articles.read_state rs ON rs.article_id = a.id AND rs.user_did = ?
232448 WHERE COALESCE(rs.is_read, 0) = 0
233449 ORDER BY score DESC, (CASE WHEN a.published > 'now' THEN 1 ELSE 0 END), a.published DESC
234450 LIMIT ?
235235- `, userDID, userDID, userDID, userDID, userDID, userDID, w.WLike, w.WSocial, userDID, limit)
451451+ `, userDID, userDID, userDID, userDID, userDID, userDID,
452452+ w.WLike, w.WSocial, w.WContent, userDID, limit)
236453 if err != nil {
237454 return nil, err
238455 }
···282499}
283500284501func (e *Engine) ComputeSignalProfiles(ctx context.Context) error {
285285- tx, err := e.db.BeginTx(ctx, nil)
502502+ conn, err := e.db.Conn(ctx)
286503 if err != nil {
287504 return err
288505 }
289289- defer func() { _ = tx.Rollback() }()
506506+ defer conn.Close()
290507291291- if _, err := tx.ExecContext(ctx, `DELETE FROM recs.user_signal_profiles`); err != nil {
292292- return err
293293- }
508508+ {
509509+ tx, err := conn.BeginTx(ctx, nil)
510510+ if err != nil {
511511+ return err
512512+ }
513513+ defer func() { _ = tx.Rollback() }()
294514295295- if _, err := tx.ExecContext(ctx, `
296296- CREATE TEMP TABLE IF NOT EXISTS _user_like_counts (user_did TEXT PRIMARY KEY, cnt INT)
297297- `); err != nil {
298298- return err
299299- }
300300- if _, err := tx.ExecContext(ctx, `DELETE FROM _user_like_counts`); err != nil {
301301- return err
302302- }
303303- if _, err := tx.ExecContext(ctx, `
304304- INSERT INTO _user_like_counts SELECT author_did, COUNT(*) FROM articles.likes GROUP BY author_did
305305- `); err != nil {
306306- return err
307307- }
515515+ if _, err := tx.ExecContext(ctx, `
516516+ CREATE TEMP TABLE IF NOT EXISTS _user_like_counts (user_did TEXT PRIMARY KEY, cnt INT)
517517+ `); err != nil {
518518+ return err
519519+ }
520520+ if _, err := tx.ExecContext(ctx, `DELETE FROM _user_like_counts`); err != nil {
521521+ return err
522522+ }
523523+ if _, err := tx.ExecContext(ctx, `
524524+ INSERT INTO _user_like_counts SELECT author_did, COUNT(*) FROM articles.likes GROUP BY author_did
525525+ `); err != nil {
526526+ return err
527527+ }
308528309309- if _, err := tx.ExecContext(ctx, `
310310- CREATE TEMP TABLE IF NOT EXISTS _user_tag_counts (user_did TEXT PRIMARY KEY, cnt INT)
311311- `); err != nil {
312312- return err
313313- }
314314- if _, err := tx.ExecContext(ctx, `DELETE FROM _user_tag_counts`); err != nil {
315315- return err
316316- }
317317- if _, err := tx.ExecContext(ctx, `
318318- INSERT INTO _user_tag_counts
319319- WITH user_tags AS (
320320- SELECT author_did, TRIM(value) AS tag
321321- FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]')
322322- WHERE tags IS NOT NULL AND tags != ''
323323- )
324324- SELECT author_did, COUNT(DISTINCT tag) FROM user_tags GROUP BY author_did
325325- `); err != nil {
326326- return err
327327- }
529529+ if _, err := tx.ExecContext(ctx, `
530530+ CREATE TEMP TABLE IF NOT EXISTS _user_tag_counts (user_did TEXT PRIMARY KEY, cnt INT)
531531+ `); err != nil {
532532+ return err
533533+ }
534534+ if _, err := tx.ExecContext(ctx, `DELETE FROM _user_tag_counts`); err != nil {
535535+ return err
536536+ }
537537+ if _, err := tx.ExecContext(ctx, `
538538+ INSERT INTO _user_tag_counts
539539+ WITH user_tags AS (
540540+ SELECT author_did, TRIM(value) AS tag
541541+ FROM articles.annotations, json_each('["' || REPLACE(tags, ',', '","') || '"]')
542542+ WHERE tags IS NOT NULL AND tags != ''
543543+ )
544544+ SELECT author_did, COUNT(DISTINCT tag) FROM user_tags GROUP BY author_did
545545+ `); err != nil {
546546+ return err
547547+ }
328548329329- if _, err := tx.ExecContext(ctx, `
330330- CREATE TEMP TABLE IF NOT EXISTS _user_top_categories (user_did TEXT PRIMARY KEY, categories TEXT)
331331- `); err != nil {
332332- return err
333333- }
334334- if _, err := tx.ExecContext(ctx, `DELETE FROM _user_top_categories`); err != nil {
335335- return err
336336- }
337337- if _, err := tx.ExecContext(ctx, `
338338- INSERT INTO _user_top_categories
339339- SELECT user_did, '[' || GROUP_CONCAT('{"c":"' || category || '","n":"' || CAST(cnt AS TEXT) || '}') || ']'
340340- FROM (
341341- SELECT user_did, category, COUNT(*) AS cnt
342342- FROM articles.subscriptions
343343- WHERE category IS NOT NULL AND category != ''
344344- GROUP BY user_did, category
345345- ORDER BY COUNT(*) DESC
346346- LIMIT 5
347347- )
348348- GROUP BY user_did
349349- `); err != nil {
350350- return err
351351- }
549549+ if _, err := tx.ExecContext(ctx, `
550550+ CREATE TEMP TABLE IF NOT EXISTS _user_top_categories (user_did TEXT PRIMARY KEY, categories TEXT)
551551+ `); err != nil {
552552+ return err
553553+ }
554554+ if _, err := tx.ExecContext(ctx, `DELETE FROM _user_top_categories`); err != nil {
555555+ return err
556556+ }
557557+ if _, err := tx.ExecContext(ctx, `
558558+ INSERT INTO _user_top_categories
559559+ SELECT user_did, '[' || GROUP_CONCAT('{"c":"' || category || '","n":"' || CAST(cnt AS TEXT) || '}') || ']'
560560+ FROM (
561561+ SELECT user_did, category, COUNT(*) AS cnt
562562+ FROM articles.subscriptions
563563+ WHERE category IS NOT NULL AND category != ''
564564+ GROUP BY user_did, category
565565+ ORDER BY COUNT(*) DESC
566566+ LIMIT 5
567567+ )
568568+ GROUP BY user_did
569569+ `); err != nil {
570570+ return err
571571+ }
352572353353- _, err = tx.ExecContext(ctx, `
354354- INSERT INTO recs.user_signal_profiles (user_did, total_likes, total_tags, top_categories)
355355- SELECT
356356- u.did,
357357- COALESCE(lc.cnt, 0),
358358- COALESCE(tc.cnt, 0),
359359- COALESCE(cc.categories, '[]')
360360- FROM main.users u
361361- LEFT JOIN _user_like_counts lc ON lc.user_did = u.did
362362- LEFT JOIN _user_tag_counts tc ON tc.user_did = u.did
363363- LEFT JOIN _user_top_categories cc ON cc.user_did = u.did
364364- `)
365365- if err != nil {
366366- return err
573573+ if _, err := tx.ExecContext(ctx, `
574574+ CREATE TEMP TABLE IF NOT EXISTS _signal_profiles_staging (
575575+ user_did TEXT PRIMARY KEY, total_likes INT, total_tags INT, top_categories TEXT
576576+ )
577577+ `); err != nil {
578578+ return err
579579+ }
580580+ if _, err := tx.ExecContext(ctx, `DELETE FROM _signal_profiles_staging`); err != nil {
581581+ return err
582582+ }
583583+ if _, err := tx.ExecContext(ctx, `
584584+ INSERT INTO _signal_profiles_staging (user_did, total_likes, total_tags, top_categories)
585585+ SELECT
586586+ u.did,
587587+ COALESCE(lc.cnt, 0),
588588+ COALESCE(tc.cnt, 0),
589589+ COALESCE(cc.categories, '[]')
590590+ FROM main.users u
591591+ LEFT JOIN _user_like_counts lc ON lc.user_did = u.did
592592+ LEFT JOIN _user_tag_counts tc ON tc.user_did = u.did
593593+ LEFT JOIN _user_top_categories cc ON cc.user_did = u.did
594594+ `); err != nil {
595595+ return err
596596+ }
597597+598598+ if err := tx.Commit(); err != nil {
599599+ return err
600600+ }
367601 }
368602369369- e.logger.Info("signal profiles computed")
370370- return tx.Commit()
603603+ {
604604+ tx, err := conn.BeginTx(ctx, nil)
605605+ if err != nil {
606606+ return err
607607+ }
608608+ defer func() { _ = tx.Rollback() }()
609609+610610+ if _, err := tx.ExecContext(ctx, `DELETE FROM recs.user_signal_profiles`); err != nil {
611611+ return err
612612+ }
613613+ if _, err := tx.ExecContext(ctx, `INSERT INTO recs.user_signal_profiles (user_did, total_likes, total_tags, top_categories) SELECT user_did, total_likes, total_tags, top_categories FROM _signal_profiles_staging`); err != nil {
614614+ return err
615615+ }
616616+617617+ e.logger.Info("signal profiles computed")
618618+ return tx.Commit()
619619+ }
371620}
372621373622func (e *Engine) ColdStartRecommendations(ctx context.Context, userDID string, limit int) ([]*FeedRecommendation, error) {
···377626 return nil, nil
378627 }
379628629629+ recs, err := e.coldStartFromEmbeddings(ctx, userDID, limit)
630630+ if err != nil {
631631+ e.logger.Warn("embedding cold start failed", "error", err)
632632+ }
633633+ if len(recs) > 0 {
634634+ return recs, nil
635635+ }
636636+637637+ return e.coldStartFromGraphAndPopular(ctx, userDID, limit)
638638+}
639639+640640+func (e *Engine) coldStartFromGraphAndPopular(ctx context.Context, userDID string, limit int) ([]*FeedRecommendation, error) {
380641 rows, err := e.db.QueryContext(ctx, `
381642 WITH followed_feeds AS (
382643 SELECT s.feed_url, 1.0 AS weight
···384645 JOIN articles.subscriptions s ON s.user_did = fd.user_b
385646 WHERE fd.user_a = ? AND fd.distance = 1
386647 AND s.feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?)
387387- AND s.feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
648648+ AND s.feed_url NOT IN (SELECT target_id FROM main.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
388649 ),
389650 popular_feeds AS (
390651 SELECT feed_url, subscriber_count,
···392653 FROM articles.feeds
393654 WHERE subscriber_count > 0
394655 AND feed_url NOT IN (SELECT feed_url FROM articles.subscriptions WHERE user_did = ?)
395395- AND feed_url NOT IN (SELECT target_id FROM recs.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
656656+ AND feed_url NOT IN (SELECT target_id FROM main.dismissed_recommendations WHERE user_did = ? AND target_type = 'feed')
396657 ORDER BY subscriber_count DESC
397658 LIMIT 50
398659 ),
+128-25
internal/cluster/social.go
···2233import (
44 "context"
55+ "fmt"
56)
6777-func (e *Engine) ComputeFollowDistances(ctx context.Context) error {
88+const maxFollowDepth = 3
99+1010+type followDistance struct {
1111+ userA string
1212+ userB string
1313+ distance int
1414+}
1515+1616+func (e *Engine) ComputeFollowDistancesData(ctx context.Context, sources []string) ([]followDistance, error) {
1717+ if len(sources) == 0 {
1818+ return nil, nil
1919+ }
2020+2121+ rows, err := e.db.QueryContext(ctx, `SELECT user_did, target_did FROM main.follows WHERE user_did != target_did`)
2222+ if err != nil {
2323+ return nil, err
2424+ }
2525+ defer rows.Close()
2626+2727+ adj := make(map[string][]string)
2828+ for rows.Next() {
2929+ var src, dst string
3030+ if err := rows.Scan(&src, &dst); err != nil {
3131+ return nil, err
3232+ }
3333+ adj[src] = append(adj[src], dst)
3434+ }
3535+ if err := rows.Err(); err != nil {
3636+ return nil, err
3737+ }
3838+3939+ var result []followDistance
4040+ for _, src := range sources {
4141+ dist := map[string]int{src: 0}
4242+ queue := []string{src}
4343+ for len(queue) > 0 {
4444+ cur := queue[0]
4545+ queue = queue[1:]
4646+ d := dist[cur]
4747+ if d >= maxFollowDepth {
4848+ continue
4949+ }
5050+ for _, next := range adj[cur] {
5151+ if _, ok := dist[next]; !ok {
5252+ dist[next] = d + 1
5353+ queue = append(queue, next)
5454+ }
5555+ }
5656+ }
5757+ for other, d := range dist {
5858+ if d > 0 {
5959+ result = append(result, followDistance{userA: src, userB: other, distance: d})
6060+ }
6161+ }
6262+ }
6363+6464+ return result, nil
6565+}
6666+6767+func (e *Engine) WriteFollowDistances(ctx context.Context, distances []followDistance) error {
868 tx, err := e.db.BeginTx(ctx, nil)
969 if err != nil {
1070 return err
···1575 return err
1676 }
17771818- _, err = tx.ExecContext(ctx, `
1919- INSERT INTO recs.follow_distances (user_a, user_b, distance)
2020- SELECT user_a, user_b, MIN(distance) FROM (
2121- SELECT user_did AS user_a, target_did AS user_b, 1 AS distance FROM main.follows WHERE user_did != target_did
2222- UNION ALL
2323- SELECT f1.user_did, f2.target_did, 2
2424- FROM main.follows f1
2525- JOIN main.follows f2 ON f1.target_did = f2.user_did
2626- WHERE f1.user_did != f2.target_did
2727- ) GROUP BY user_a, user_b
2828- `)
7878+ stmt, err := tx.PrepareContext(ctx, `INSERT INTO recs.follow_distances (user_a, user_b, distance) VALUES (?, ?, ?)`)
2979 if err != nil {
3080 return err
3181 }
8282+ defer stmt.Close()
32833333- e.logger.Info("follow distances computed")
8484+ for _, d := range distances {
8585+ if _, err := stmt.ExecContext(ctx, d.userA, d.userB, d.distance); err != nil {
8686+ return err
8787+ }
8888+ }
8989+9090+ e.logger.Info("follow distances computed", "pairs", len(distances))
3491 return tx.Commit()
3592}
36933737-func (e *Engine) ComputeFollowDistancesIncremental(ctx context.Context) error {
3838- var maxFollowed string
3939- err := e.db.QueryRowContext(ctx, `
4040- SELECT COALESCE(MAX(followed_at), '1970-01-01') FROM main.follows
4141- `).Scan(&maxFollowed)
9494+// ComputeFollowDistances incrementally recomputes follow distances for users
9595+// whose follows changed since the last run, as tracked by the follows_dirty column.
9696+func (e *Engine) ComputeFollowDistances(ctx context.Context) error {
9797+ rows, err := e.db.QueryContext(ctx, `SELECT did FROM main.users WHERE follows_dirty = 1`)
4298 if err != nil {
4399 return err
44100 }
451014646- var lastComputed string
4747- err = e.db.QueryRowContext(ctx, `
4848- SELECT COALESCE(MAX(computed_at), '1970-01-01') FROM recs.user_similarity
4949- `).Scan(&lastComputed)
102102+ var dirtyUsers []string
103103+ for rows.Next() {
104104+ var did string
105105+ if err := rows.Scan(&did); err != nil {
106106+ rows.Close()
107107+ return err
108108+ }
109109+ dirtyUsers = append(dirtyUsers, did)
110110+ }
111111+ rows.Close()
112112+113113+ if len(dirtyUsers) == 0 {
114114+ return nil
115115+ }
116116+117117+ distances, err := e.ComputeFollowDistancesData(ctx, dirtyUsers)
50118 if err != nil {
51119 return err
52120 }
531215454- if maxFollowed <= lastComputed {
5555- return nil
122122+ tx, err := e.db.BeginTx(ctx, nil)
123123+ if err != nil {
124124+ return err
56125 }
126126+ defer func() { _ = tx.Rollback() }()
571275858- return e.ComputeFollowDistances(ctx)
128128+ ph := make([]string, len(dirtyUsers))
129129+ args := make([]any, len(dirtyUsers))
130130+ for i, did := range dirtyUsers {
131131+ ph[i] = "?"
132132+ args[i] = did
133133+ }
134134+ if _, err := tx.ExecContext(ctx,
135135+ fmt.Sprintf("DELETE FROM recs.follow_distances WHERE user_a IN (%s)", joinPh(ph)),
136136+ args...,
137137+ ); err != nil {
138138+ return err
139139+ }
140140+141141+ stmt, err := tx.PrepareContext(ctx, `INSERT INTO recs.follow_distances (user_a, user_b, distance) VALUES (?, ?, ?)`)
142142+ if err != nil {
143143+ return err
144144+ }
145145+ defer stmt.Close()
146146+147147+ for _, d := range distances {
148148+ if _, err := stmt.ExecContext(ctx, d.userA, d.userB, d.distance); err != nil {
149149+ return err
150150+ }
151151+ }
152152+153153+ if _, err := tx.ExecContext(ctx,
154154+ fmt.Sprintf("UPDATE main.users SET follows_dirty = 0 WHERE did IN (%s)", joinPh(ph)),
155155+ args...,
156156+ ); err != nil {
157157+ return err
158158+ }
159159+160160+ e.logger.Info("follow distances computed", "users", len(dirtyUsers), "pairs", len(distances))
161161+ return tx.Commit()
59162}
+11-5
internal/cluster/weights.go
···1111 minActionsTune = 5
1212)
13131414+// RewardSignal increases the weight of the given signal for a user. Only takes
1515+// effect after minActionsTune positive actions. Signal must be one of: "sub",
1616+// "like", "tag", "social", "pop", "category", "content".
1417func (e *Engine) RewardSignal(ctx context.Context, userDID string, signal string) {
1515- return
1618 e.adjustWeight(ctx, userDID, signal, 1.0)
1719}
18202121+// PenalizeSignal decreases the weight of the given signal for a user.
1922func (e *Engine) PenalizeSignal(ctx context.Context, userDID string, signal string) {
2020- return
2123 e.adjustWeight(ctx, userDID, signal, -1.0)
2224}
23252426func (e *Engine) adjustWeight(ctx context.Context, userDID string, signal string, delta float64) {
2527 var actedCount int
2628 _ = e.db.QueryRowContext(ctx, `
2727- SELECT COUNT(*) FROM recs.recommendation_impressions WHERE user_did = ? AND acted = 1
2929+ SELECT COUNT(*) FROM main.recommendation_impressions WHERE user_did = ? AND acted = 1
2830 `, userDID).Scan(&actedCount)
2931 if actedCount < minActionsTune {
3032 return
···35373638 if exists == 0 {
3739 _, _ = e.db.ExecContext(ctx, `
3838- INSERT INTO recs.user_signal_weights (user_did, w_sub, w_like, w_tag, w_social, w_pop, w_category)
3939- VALUES (?, 1.0, 0.5, 0.3, 0.7, 0.2, 0.4)
4040+ INSERT INTO recs.user_signal_weights (user_did, w_sub, w_like, w_tag, w_social, w_pop, w_category, w_content)
4141+ VALUES (?, 1.0, 0.5, 0.3, 0.7, 0.2, 0.4, 0.4)
4042 `, userDID)
4143 }
4244···6870 return "w_pop"
6971 case "category":
7072 return "w_category"
7373+ case "content":
7474+ return "w_content"
7175 default:
7276 return ""
7377 }
7478}
75798080+// GetDominantSignal returns the signal name with the highest weight.
7681func (e *Engine) GetDominantSignal(w SignalWeights) string {
7782 signals := map[string]float64{
7883 "sub": w.WSub,
···8186 "social": w.WSocial,
8287 "pop": w.WPop,
8388 "category": w.WCategory,
8989+ "content": w.WContent,
8490 }
85918692 var best string
+49-25
internal/db/db.go
···11package db
2233import (
44+ "context"
45 "database/sql"
56 "fmt"
67 "math"
···126127 return nil
127128}
128129130130+func (d *Databases) InitVecTables(dimension int) error {
131131+ if dimension <= 0 {
132132+ return nil
133133+ }
134134+ for _, stmt := range []string{
135135+ fmt.Sprintf(`CREATE VIRTUAL TABLE IF NOT EXISTS recs.feed_embeddings USING vec0(feed_url TEXT PRIMARY KEY, embedding float[%d])`, dimension),
136136+ fmt.Sprintf(`CREATE VIRTUAL TABLE IF NOT EXISTS recs.article_embeddings USING vec0(article_id INTEGER PRIMARY KEY, embedding float[%d])`, dimension),
137137+ } {
138138+ if _, err := d.db.ExecContext(context.Background(), stmt); err != nil {
139139+ return fmt.Errorf("create vec0 table: %w", err)
140140+ }
141141+ }
142142+ return nil
143143+}
144144+129145func (d *Databases) DB() *sql.DB {
130146 return d.db.DB
131147}
···161177 `CREATE TABLE IF NOT EXISTS users (
162178 did TEXT PRIMARY KEY,
163179 indexed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
164164- updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
180180+ updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
181181+ follows_dirty BOOLEAN NOT NULL DEFAULT 1
165182 )`,
166183167184 `CREATE TABLE IF NOT EXISTS follows (
···189206 `CREATE INDEX IF NOT EXISTS idx_follows_target ON follows(target_did)`,
190207 `CREATE INDEX IF NOT EXISTS idx_follows_uri ON follows(uri)`,
191208 `CREATE INDEX IF NOT EXISTS idx_follows_followed_at ON follows(followed_at)`,
209209+210210+ `CREATE TABLE IF NOT EXISTS dismissed_recommendations (
211211+ user_did TEXT NOT NULL,
212212+ target_type TEXT NOT NULL CHECK(target_type IN ('feed', 'article')),
213213+ target_id TEXT NOT NULL,
214214+ reason TEXT,
215215+ dismissed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
216216+ PRIMARY KEY (user_did, target_type, target_id)
217217+ )`,
218218+219219+ `CREATE TABLE IF NOT EXISTS recommendation_impressions (
220220+ user_did TEXT NOT NULL,
221221+ target_type TEXT NOT NULL CHECK(target_type IN ('feed', 'article')),
222222+ target_id TEXT NOT NULL,
223223+ first_shown_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
224224+ last_shown_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
225225+ shown_count INTEGER NOT NULL DEFAULT 1,
226226+ acted BOOLEAN NOT NULL DEFAULT 0,
227227+ PRIMARY KEY (user_did, target_type, target_id)
228228+ )`,
229229+230230+ `CREATE INDEX IF NOT EXISTS idx_dismissed_user_type ON dismissed_recommendations(user_did, target_type)`,
231231+ `CREATE INDEX IF NOT EXISTS idx_impressions_user_unacted ON recommendation_impressions(user_did, acted, shown_count)`,
232232+ `CREATE INDEX IF NOT EXISTS idx_impressions_last_shown ON recommendation_impressions(last_shown_at)`,
192233}
193234194235var articlesSchema = []string{
···318359 CHECK(user_a < user_b)
319360 )`,
320361321321- `CREATE TABLE IF NOT EXISTS recs.dismissed_recommendations (
322322- user_did TEXT NOT NULL,
323323- target_type TEXT NOT NULL CHECK(target_type IN ('feed', 'article')),
324324- target_id TEXT NOT NULL,
325325- reason TEXT,
326326- dismissed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
327327- PRIMARY KEY (user_did, target_type, target_id)
328328- )`,
329329-330330- `CREATE TABLE IF NOT EXISTS recs.recommendation_impressions (
331331- user_did TEXT NOT NULL,
332332- target_type TEXT NOT NULL CHECK(target_type IN ('feed', 'article')),
333333- target_id TEXT NOT NULL,
334334- first_shown_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
335335- last_shown_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
336336- shown_count INTEGER NOT NULL DEFAULT 1,
337337- acted BOOLEAN NOT NULL DEFAULT 0,
338338- PRIMARY KEY (user_did, target_type, target_id)
339339- )`,
340340-341362 `CREATE TABLE IF NOT EXISTS recs.follow_distances (
342363 user_a TEXT NOT NULL,
343364 user_b TEXT NOT NULL,
344344- distance INTEGER NOT NULL CHECK(distance IN (1, 2)),
365365+ distance INTEGER NOT NULL CHECK(distance IN (1, 2, 3)),
345366 PRIMARY KEY (user_a, user_b)
346367 )`,
347368···353374 w_social REAL NOT NULL DEFAULT 0.7,
354375 w_pop REAL NOT NULL DEFAULT 0.2,
355376 w_category REAL NOT NULL DEFAULT 0.4,
377377+ w_content REAL NOT NULL DEFAULT 0.4,
356378 updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
357379 )`,
358380···364386 updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
365387 )`,
366388367367- `CREATE INDEX IF NOT EXISTS recs.idx_dismissed_user_type ON dismissed_recommendations(user_did, target_type)`,
368368- `CREATE INDEX IF NOT EXISTS recs.idx_impressions_user_unacted ON recommendation_impressions(user_did, acted, shown_count)`,
369369- `CREATE INDEX IF NOT EXISTS recs.idx_impressions_last_shown ON recommendation_impressions(last_shown_at)`,
389389+ `CREATE TABLE IF NOT EXISTS recs.feed_embedding_meta (
390390+ feed_url TEXT PRIMARY KEY,
391391+ source_text TEXT NOT NULL DEFAULT ''
392392+ )`,
393393+370394 `CREATE INDEX IF NOT EXISTS recs.idx_follow_distances_b ON follow_distances(user_b)`,
371395 `CREATE INDEX IF NOT EXISTS recs.idx_follow_distances_a_dist ON follow_distances(user_a, distance)`,
372396 `CREATE INDEX IF NOT EXISTS recs.idx_user_similarity_b ON user_similarity(user_b)`,
+22-1
internal/db/follow.go
···2222 uri = excluded.uri,
2323 cid = excluded.cid
2424 `, userDID, targetDID, nilIfEmpty(uri), nilIfEmpty(cid))
2525+ if err != nil {
2626+ return err
2727+ }
2828+ _, err = s.db.ExecContext(ctx, `UPDATE users SET follows_dirty = 1 WHERE did = ?`, userDID)
2529 return err
2630}
27312832func (s *UserStore) DeleteFollow(ctx context.Context, userDID, targetDID string) error {
2933 _, err := s.db.ExecContext(ctx, `DELETE FROM follows WHERE user_did = ? AND target_did = ?`, userDID, targetDID)
3434+ if err != nil {
3535+ return err
3636+ }
3737+ _, err = s.db.ExecContext(ctx, `UPDATE users SET follows_dirty = 1 WHERE did = ?`, userDID)
3038 return err
3139}
32403341func (s *UserStore) DeleteFollowByURI(ctx context.Context, uri string) error {
3434- _, err := s.db.ExecContext(ctx, `DELETE FROM follows WHERE uri = ?`, uri)
4242+ _, err := s.db.ExecContext(ctx, `UPDATE users SET follows_dirty = 1 WHERE did IN (SELECT user_did FROM follows WHERE uri = ?)`, uri)
4343+ if err != nil {
4444+ return err
4545+ }
4646+ _, err = s.db.ExecContext(ctx, `DELETE FROM follows WHERE uri = ?`, uri)
3547 return err
3648}
3749···139151 }
140152 rows.Close()
141153154154+ var changed bool
142155 for targetDID := range existing {
143156 if _, ok := activeFollows[targetDID]; !ok {
144157 if _, err := tx.ExecContext(ctx, `DELETE FROM follows WHERE user_did = ? AND target_did = ?`, userDID, targetDID); err != nil {
145158 return err
146159 }
160160+ changed = true
147161 }
148162 }
149163···163177 if err != nil {
164178 return err
165179 }
180180+ changed = true
181181+ }
182182+ }
183183+184184+ if changed {
185185+ if _, err := tx.ExecContext(ctx, `UPDATE users SET follows_dirty = 1 WHERE did = ?`, userDID); err != nil {
186186+ return err
166187 }
167188 }
168189
···1616- OPML import and export
1717- Sign in with Bluesky / Atmosphere account — no new account needed
18181919+## How recommendations work
2020+2121+Glean looks at what you and other users subscribe to, read, and like to suggest feeds and people you might enjoy.
2222+2323+**Feed suggestions** come from readers who share your subscriptions. If a lot of people who follow the same blogs as you also follow a blog you haven't seen, that blog shows up as a recommendation. The system also considers which articles you've liked, whether you follow the person on Bluesky, and how popular the feed is overall.
2424+2525+**People suggestions** are readers whose subscriptions overlap with yours. The more feeds you share, the higher they rank. You also see whether you have any Bluesky follows in common.
2626+2727+**Dismissals** keep things tidy. If you dismiss a recommendation, it won't come back. If a suggestion sits ignored for more than 5 days, it's automatically removed so newer recommendations can take its place.
2828+2929+**Cold start.** If you're new and have fewer than five subscriptions, Glean shows feeds from people you follow on Bluesky alongside popular feeds from the community, so there's something to explore right away.
3030+3131+The system improves over time: as you subscribe to feeds and like articles, Glean learns which signals matter most to you and adjusts accordingly.
3232+1933## Self-hosting
20342135### Docker
···5165| `GLEAN_PLC_URL` | `https://didplc.glean.at` | PLC directory URL for DID resolution |
5266| `GLEAN_OAUTH_CLIENT_ID` | _(empty)_ | OAuth client metadata URL (leave empty for localhost dev) |
5367| `GLEAN_OAUTH_REDIRECT_URL` | _(empty)_ | OAuth redirect URL (leave empty for localhost dev) |
6868+| `GLEAN_EMBED_BASE_URL` | _(empty)_ | Embeddings API base URL (recommended, see below) |
6969+| `GLEAN_EMBED_API_KEY` | _(empty)_ | API key for the embeddings endpoint |
7070+| `GLEAN_EMBED_MODEL` | `text-embedding-3-small` | Embedding model name |
7171+| `GLEAN_EMBED_DIMENSION` | `1536` | Embedding vector dimension |
54725573For production:
5674