A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Bluesky and Tangled social graph support

+462 -5
+5 -4
internal/atproto/firehose.go
··· 40 40 handler: handler, 41 41 logger: logger, 42 42 collections: map[string]bool{ 43 - "at.glean.subscription": true, 44 - "at.glean.annotation": true, 45 - "at.glean.like": true, 46 - // TODO: support at.margin.annotation as well (ref: https://tangled.org/did:plc:rgvlxa3ecwx3bfyzlrzrwtrs/issues/1) 43 + "at.glean.subscription": true, 44 + "at.glean.annotation": true, 45 + "at.glean.like": true, 46 + "app.bsky.graph.follow": true, 47 + "sh.tangled.graph.follow": true, 47 48 }, 48 49 } 49 50 }
+20
internal/atproto/firehose_handler.go
··· 27 27 return h.handleLike(ctx, event) 28 28 case "at.glean.annotation": 29 29 return h.handleAnnotation(ctx, event) 30 + case "app.bsky.graph.follow", "sh.tangled.graph.follow": 31 + return h.handleFollow(ctx, event) 30 32 } 31 33 return nil 32 34 } ··· 135 137 } 136 138 return nil 137 139 } 140 + 141 + func (h *FirehoseDBHandler) handleFollow(ctx context.Context, event *FirehoseEvent) error { 142 + switch event.Type { 143 + case "create": 144 + var rec FollowRecord 145 + if err := json.Unmarshal(event.Value, &rec); err != nil { 146 + return err 147 + } 148 + if rec.Subject == "" { 149 + return nil 150 + } 151 + return h.db.UpsertFollow(ctx, event.DID, rec.Subject, event.URI, event.CID) 152 + 153 + case "delete": 154 + return h.db.DeleteFollowByURI(ctx, event.URI) 155 + } 156 + return nil 157 + }
+5
internal/atproto/lexicon.go
··· 28 28 ArticleURL string `json:"articleUrl"` 29 29 } 30 30 31 + type FollowRecord struct { 32 + Subject string `json:"subject"` 33 + CreatedAt string `json:"createdAt"` 34 + } 35 + 31 36 type Record struct { 32 37 URI string 33 38 CID string
+50
internal/atproto/sync.go
··· 31 31 if err := s.syncCollection(ctx, userDID, "at.glean.annotation", s.reconcileAnnotation); err != nil { 32 32 s.logger.Error("sync annotations failed", "error", err, "did", userDID) 33 33 } 34 + if err := s.syncFollows(ctx, userDID); err != nil { 35 + s.logger.Error("sync follows failed", "error", err, "did", userDID) 36 + } 34 37 35 38 return nil 36 39 } ··· 148 151 } 149 152 return s.db.CreateAnnotation(ctx, a) 150 153 } 154 + 155 + func (s *Sync) syncFollows(ctx context.Context, userDID string) error { 156 + activeFollows := make(map[string]db.Follow) 157 + 158 + for _, collection := range []string{"app.bsky.graph.follow", "sh.tangled.graph.follow"} { 159 + cursor := "" 160 + for { 161 + records, next, err := s.client.ListRecords(ctx, userDID, collection, 100, cursor) 162 + if err != nil { 163 + return err 164 + } 165 + 166 + for _, r := range records { 167 + var rec FollowRecord 168 + if err := json.Unmarshal(r.Value, &rec); err != nil { 169 + continue 170 + } 171 + if rec.Subject == "" { 172 + continue 173 + } 174 + 175 + t, _ := time.Parse(time.RFC3339, rec.CreatedAt) 176 + activeFollows[rec.Subject] = db.Follow{ 177 + URI: db.NullStr(r.URI), 178 + CID: db.NullStr(r.CID), 179 + FollowedAt: db.NullTime(t), 180 + } 181 + 182 + var handle string 183 + if rec.Subject != userDID { 184 + s.db.CreateUser(ctx, rec.Subject, handle, "", "") 185 + } 186 + } 187 + 188 + if next == "" || len(records) == 0 { 189 + break 190 + } 191 + cursor = next 192 + } 193 + } 194 + 195 + if len(activeFollows) == 0 { 196 + return nil 197 + } 198 + 199 + return s.db.SyncFollows(ctx, userDID, activeFollows) 200 + }
+15
internal/cluster/jaccard.go
··· 120 120 return err 121 121 } 122 122 123 + _, err = tx.ExecContext(ctx, ` 124 + INSERT INTO user_similarity (user_a, user_b, jaccard, common_feeds) 125 + SELECT 126 + MIN(f.user_did, f.target_did), 127 + MAX(f.user_did, f.target_did), 128 + 0.5, 129 + 0 130 + FROM follows f 131 + ON CONFLICT(user_a, user_b) DO UPDATE SET 132 + jaccard = jaccard + 0.5 133 + `) 134 + if err != nil { 135 + return err 136 + } 137 + 123 138 e.logger.Info("user similarity computed") 124 139 return tx.Commit() 125 140 }
+10
internal/db/db.go
··· 169 169 computed_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, 170 170 PRIMARY KEY (user_did, feed_url, article_url) 171 171 )`, 172 + `CREATE TABLE IF NOT EXISTS follows ( 173 + user_did TEXT NOT NULL REFERENCES users(did), 174 + target_did TEXT NOT NULL, 175 + uri TEXT, 176 + cid TEXT, 177 + followed_at DATETIME, 178 + PRIMARY KEY (user_did, target_did) 179 + )`, 172 180 `CREATE TABLE IF NOT EXISTS oauth_auth_requests ( 173 181 state TEXT PRIMARY KEY, 174 182 data TEXT NOT NULL ··· 187 195 `CREATE INDEX IF NOT EXISTS idx_annotations_article ON annotations(article_url)`, 188 196 `CREATE INDEX IF NOT EXISTS idx_likes_article ON likes(feed_url, article_url)`, 189 197 `CREATE INDEX IF NOT EXISTS idx_likes_author ON likes(author_did)`, 198 + `CREATE INDEX IF NOT EXISTS idx_follows_user ON follows(user_did)`, 199 + `CREATE INDEX IF NOT EXISTS idx_follows_target ON follows(target_did)`, 190 200 }
+171
internal/db/follow.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "time" 7 + ) 8 + 9 + type Follow struct { 10 + UserDID string 11 + TargetDID string 12 + URI sql.NullString 13 + CID sql.NullString 14 + FollowedAt sql.NullTime 15 + } 16 + 17 + func (db *DB) UpsertFollow(ctx context.Context, userDID, targetDID, uri, cid string) error { 18 + _, err := db.ExecContext(ctx, ` 19 + INSERT INTO follows (user_did, target_did, uri, cid, followed_at) 20 + VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) 21 + ON CONFLICT(user_did, target_did) DO UPDATE SET 22 + uri = excluded.uri, 23 + cid = excluded.cid 24 + `, userDID, targetDID, uriOrNil("", uri), uriOrNil("", cid)) 25 + return err 26 + } 27 + 28 + func (db *DB) DeleteFollow(ctx context.Context, userDID, targetDID string) error { 29 + _, err := db.ExecContext(ctx, `DELETE FROM follows WHERE user_did = ? AND target_did = ?`, userDID, targetDID) 30 + return err 31 + } 32 + 33 + func (db *DB) DeleteFollowByURI(ctx context.Context, uri string) error { 34 + _, err := db.ExecContext(ctx, `DELETE FROM follows WHERE uri = ?`, uri) 35 + return err 36 + } 37 + 38 + func (db *DB) ListFollows(ctx context.Context, userDID string, limit, offset int) ([]*Follow, error) { 39 + rows, err := db.QueryContext(ctx, ` 40 + SELECT user_did, target_did, uri, cid, followed_at 41 + FROM follows WHERE user_did = ? 42 + ORDER BY followed_at DESC 43 + LIMIT ? OFFSET ? 44 + `, userDID, limit, offset) 45 + if err != nil { 46 + return nil, err 47 + } 48 + defer rows.Close() 49 + 50 + var follows []*Follow 51 + for rows.Next() { 52 + f := &Follow{} 53 + if err := rows.Scan(&f.UserDID, &f.TargetDID, &f.URI, &f.CID, &f.FollowedAt); err != nil { 54 + return nil, err 55 + } 56 + follows = append(follows, f) 57 + } 58 + return follows, rows.Err() 59 + } 60 + 61 + func (db *DB) ListFollowers(ctx context.Context, targetDID string, limit, offset int) ([]*Follow, error) { 62 + rows, err := db.QueryContext(ctx, ` 63 + SELECT user_did, target_did, uri, cid, followed_at 64 + FROM follows WHERE target_did = ? 65 + ORDER BY followed_at DESC 66 + LIMIT ? OFFSET ? 67 + `, targetDID, limit, offset) 68 + if err != nil { 69 + return nil, err 70 + } 71 + defer rows.Close() 72 + 73 + var follows []*Follow 74 + for rows.Next() { 75 + f := &Follow{} 76 + if err := rows.Scan(&f.UserDID, &f.TargetDID, &f.URI, &f.CID, &f.FollowedAt); err != nil { 77 + return nil, err 78 + } 79 + follows = append(follows, f) 80 + } 81 + return follows, rows.Err() 82 + } 83 + 84 + func (db *DB) IsFollowing(ctx context.Context, userDID, targetDID string) (bool, error) { 85 + var exists int 86 + err := db.QueryRowContext(ctx, ` 87 + SELECT 1 FROM follows WHERE user_did = ? AND target_did = ? 88 + `, userDID, targetDID).Scan(&exists) 89 + if err == sql.ErrNoRows { 90 + return false, nil 91 + } 92 + if err != nil { 93 + return false, err 94 + } 95 + return true, nil 96 + } 97 + 98 + func (db *DB) GetFollowDIDs(ctx context.Context, userDID string) ([]string, error) { 99 + rows, err := db.QueryContext(ctx, ` 100 + SELECT target_did FROM follows WHERE user_did = ? 101 + `, userDID) 102 + if err != nil { 103 + return nil, err 104 + } 105 + defer rows.Close() 106 + 107 + var dids []string 108 + for rows.Next() { 109 + var did string 110 + if err := rows.Scan(&did); err != nil { 111 + return nil, err 112 + } 113 + dids = append(dids, did) 114 + } 115 + return dids, rows.Err() 116 + } 117 + 118 + func (db *DB) SyncFollows(ctx context.Context, userDID string, activeFollows map[string]Follow) error { 119 + tx, err := db.BeginTx(ctx, nil) 120 + if err != nil { 121 + return err 122 + } 123 + defer func() { _ = tx.Rollback() }() 124 + 125 + rows, err := tx.QueryContext(ctx, `SELECT target_did, uri, cid, followed_at FROM follows WHERE user_did = ?`, userDID) 126 + if err != nil { 127 + return err 128 + } 129 + 130 + existing := make(map[string]bool) 131 + for rows.Next() { 132 + var targetDID string 133 + var uri, cid sql.NullString 134 + var followedAt sql.NullTime 135 + if err := rows.Scan(&targetDID, &uri, &cid, &followedAt); err != nil { 136 + rows.Close() 137 + return err 138 + } 139 + existing[targetDID] = true 140 + } 141 + rows.Close() 142 + 143 + for targetDID := range existing { 144 + if _, ok := activeFollows[targetDID]; !ok { 145 + if _, err := tx.ExecContext(ctx, `DELETE FROM follows WHERE user_did = ? AND target_did = ?`, userDID, targetDID); err != nil { 146 + return err 147 + } 148 + } 149 + } 150 + 151 + for targetDID, f := range activeFollows { 152 + if !existing[targetDID] { 153 + var followedAt any 154 + if f.FollowedAt.Valid { 155 + followedAt = f.FollowedAt.Time 156 + } else { 157 + followedAt = time.Now() 158 + } 159 + _, err := tx.ExecContext(ctx, ` 160 + INSERT INTO follows (user_did, target_did, uri, cid, followed_at) 161 + VALUES (?, ?, ?, ?, ?) 162 + ON CONFLICT(user_did, target_did) DO UPDATE SET uri = excluded.uri, cid = excluded.cid 163 + `, userDID, targetDID, f.URI, f.CID, followedAt) 164 + if err != nil { 165 + return err 166 + } 167 + } 168 + } 169 + 170 + return tx.Commit() 171 + }
+184
internal/db/follow_test.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + 7 + "gotest.tools/v3/assert" 8 + ) 9 + 10 + func seedFollowData(t *testing.T, ctx context.Context, db *DB) (userDID, targetDID string) { 11 + t.Helper() 12 + 13 + userDID = "did:test:follower" 14 + targetDID = "did:test:followed" 15 + 16 + _, err := db.ExecContext(ctx, `INSERT INTO users (did, handle) VALUES (?, ?)`, userDID, "follower") 17 + assert.NilError(t, err) 18 + _, err = db.ExecContext(ctx, `INSERT INTO users (did, handle) VALUES (?, ?)`, targetDID, "followed") 19 + assert.NilError(t, err) 20 + 21 + return userDID, targetDID 22 + } 23 + 24 + func TestUpsertFollow(t *testing.T) { 25 + ctx := context.Background() 26 + db := setupTestDB(t) 27 + userDID, targetDID := seedFollowData(t, ctx, db) 28 + 29 + err := db.UpsertFollow(ctx, userDID, targetDID, "at://did:test:follower/app.bsky.graph.follow/123", "cid123") 30 + assert.NilError(t, err) 31 + 32 + following, err := db.IsFollowing(ctx, userDID, targetDID) 33 + assert.NilError(t, err) 34 + assert.Equal(t, following, true) 35 + } 36 + 37 + func TestIsFollowing_NotFollowing(t *testing.T) { 38 + ctx := context.Background() 39 + db := setupTestDB(t) 40 + userDID, targetDID := seedFollowData(t, ctx, db) 41 + 42 + following, err := db.IsFollowing(ctx, userDID, targetDID) 43 + assert.NilError(t, err) 44 + assert.Equal(t, following, false) 45 + } 46 + 47 + func TestDeleteFollow(t *testing.T) { 48 + ctx := context.Background() 49 + db := setupTestDB(t) 50 + userDID, targetDID := seedFollowData(t, ctx, db) 51 + 52 + err := db.UpsertFollow(ctx, userDID, targetDID, "at://uri", "cid") 53 + assert.NilError(t, err) 54 + 55 + err = db.DeleteFollow(ctx, userDID, targetDID) 56 + assert.NilError(t, err) 57 + 58 + following, err := db.IsFollowing(ctx, userDID, targetDID) 59 + assert.NilError(t, err) 60 + assert.Equal(t, following, false) 61 + } 62 + 63 + func TestDeleteFollowByURI(t *testing.T) { 64 + ctx := context.Background() 65 + db := setupTestDB(t) 66 + userDID, targetDID := seedFollowData(t, ctx, db) 67 + 68 + uri := "at://did:test:follower/app.bsky.graph.follow/abc" 69 + err := db.UpsertFollow(ctx, userDID, targetDID, uri, "cid") 70 + assert.NilError(t, err) 71 + 72 + err = db.DeleteFollowByURI(ctx, uri) 73 + assert.NilError(t, err) 74 + 75 + following, err := db.IsFollowing(ctx, userDID, targetDID) 76 + assert.NilError(t, err) 77 + assert.Equal(t, following, false) 78 + } 79 + 80 + func TestListFollows(t *testing.T) { 81 + ctx := context.Background() 82 + db := setupTestDB(t) 83 + userDID, _ := seedFollowData(t, ctx, db) 84 + 85 + target2 := "did:test:followed2" 86 + _, err := db.ExecContext(ctx, `INSERT INTO users (did, handle) VALUES (?, ?)`, target2, "followed2") 87 + assert.NilError(t, err) 88 + 89 + err = db.UpsertFollow(ctx, userDID, "did:test:followed", "uri1", "cid1") 90 + assert.NilError(t, err) 91 + err = db.UpsertFollow(ctx, userDID, target2, "uri2", "cid2") 92 + assert.NilError(t, err) 93 + 94 + follows, err := db.ListFollows(ctx, userDID, 10, 0) 95 + assert.NilError(t, err) 96 + assert.Equal(t, len(follows), 2) 97 + } 98 + 99 + func TestListFollowers(t *testing.T) { 100 + ctx := context.Background() 101 + db := setupTestDB(t) 102 + _, targetDID := seedFollowData(t, ctx, db) 103 + 104 + follower2 := "did:test:follower2" 105 + _, err := db.ExecContext(ctx, `INSERT INTO users (did, handle) VALUES (?, ?)`, follower2, "follower2") 106 + assert.NilError(t, err) 107 + 108 + err = db.UpsertFollow(ctx, "did:test:follower", targetDID, "uri1", "cid1") 109 + assert.NilError(t, err) 110 + err = db.UpsertFollow(ctx, follower2, targetDID, "uri2", "cid2") 111 + assert.NilError(t, err) 112 + 113 + followers, err := db.ListFollowers(ctx, targetDID, 10, 0) 114 + assert.NilError(t, err) 115 + assert.Equal(t, len(followers), 2) 116 + } 117 + 118 + func TestGetFollowDIDs(t *testing.T) { 119 + ctx := context.Background() 120 + db := setupTestDB(t) 121 + userDID, _ := seedFollowData(t, ctx, db) 122 + 123 + target2 := "did:test:followed2" 124 + _, err := db.ExecContext(ctx, `INSERT INTO users (did, handle) VALUES (?, ?)`, target2, "followed2") 125 + assert.NilError(t, err) 126 + 127 + err = db.UpsertFollow(ctx, userDID, "did:test:followed", "uri1", "cid1") 128 + assert.NilError(t, err) 129 + err = db.UpsertFollow(ctx, userDID, target2, "uri2", "cid2") 130 + assert.NilError(t, err) 131 + 132 + dids, err := db.GetFollowDIDs(ctx, userDID) 133 + assert.NilError(t, err) 134 + assert.Equal(t, len(dids), 2) 135 + } 136 + 137 + func TestSyncFollows_AddsNewRemovesStale(t *testing.T) { 138 + ctx := context.Background() 139 + db := setupTestDB(t) 140 + userDID, _ := seedFollowData(t, ctx, db) 141 + 142 + err := db.UpsertFollow(ctx, userDID, "did:test:old", "old-uri", "old-cid") 143 + assert.NilError(t, err) 144 + 145 + activeFollows := map[string]Follow{ 146 + "did:test:new1": {URI: NullStr("uri1"), CID: NullStr("cid1")}, 147 + "did:test:new2": {URI: NullStr("uri2"), CID: NullStr("cid2")}, 148 + } 149 + 150 + err = db.SyncFollows(ctx, userDID, activeFollows) 151 + assert.NilError(t, err) 152 + 153 + stillFollowing, err := db.IsFollowing(ctx, userDID, "did:test:old") 154 + assert.NilError(t, err) 155 + assert.Equal(t, stillFollowing, false) 156 + 157 + following1, err := db.IsFollowing(ctx, userDID, "did:test:new1") 158 + assert.NilError(t, err) 159 + assert.Equal(t, following1, true) 160 + 161 + following2, err := db.IsFollowing(ctx, userDID, "did:test:new2") 162 + assert.NilError(t, err) 163 + assert.Equal(t, following2, true) 164 + 165 + dids, err := db.GetFollowDIDs(ctx, userDID) 166 + assert.NilError(t, err) 167 + assert.Equal(t, len(dids), 2) 168 + } 169 + 170 + func TestUpsertFollow_Idempotent(t *testing.T) { 171 + ctx := context.Background() 172 + db := setupTestDB(t) 173 + userDID, targetDID := seedFollowData(t, ctx, db) 174 + 175 + err := db.UpsertFollow(ctx, userDID, targetDID, "uri1", "cid1") 176 + assert.NilError(t, err) 177 + err = db.UpsertFollow(ctx, userDID, targetDID, "uri2", "cid2") 178 + assert.NilError(t, err) 179 + 180 + follows, err := db.ListFollows(ctx, userDID, 10, 0) 181 + assert.NilError(t, err) 182 + assert.Equal(t, len(follows), 1) 183 + assert.Equal(t, follows[0].URI.String, "uri2") 184 + }
+2 -1
internal/db/social.go
··· 223 223 FROM user_similarity us 224 224 WHERE us.user_a = ? OR us.user_b = ? 225 225 UNION SELECT ? 226 + UNION SELECT f.target_did FROM follows f WHERE f.user_did = ? 226 227 ) 227 228 GROUP BY ar.id 228 229 ORDER BY like_count DESC, annotation_count DESC 229 230 LIMIT %d OFFSET %d 230 - `, limit, offset), since, since, userDID, userDID, userDID, userDID) 231 + `, limit, offset), since, since, userDID, userDID, userDID, userDID, userDID) 231 232 if err != nil { 232 233 return nil, err 233 234 }