A social RSS reader built on the AT Protocol. glean.at
glean atproto atmosphere rss feed social app
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

Handle errors better in sync and cleanup stream handler

+57 -85
+1 -1
go.mod
··· 11 11 github.com/prometheus/client_golang v1.19.1 12 12 go.uber.org/atomic v1.11.0 13 13 golang.org/x/net v0.53.0 14 + golang.org/x/sync v0.20.0 14 15 gotest.tools/v3 v3.5.2 15 16 ) 16 17 ··· 42 43 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect 43 44 gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect 44 45 golang.org/x/crypto v0.50.0 // indirect 45 - golang.org/x/sync v0.20.0 // indirect 46 46 golang.org/x/sys v0.43.0 // indirect 47 47 golang.org/x/text v0.36.0 // indirect 48 48 golang.org/x/time v0.5.0 // indirect
+19 -35
internal/atproto/stream_handler.go
··· 11 11 "pkg.rbrt.fr/glean/internal/db" 12 12 ) 13 13 14 + var sentinelErrors = []error{db.ErrDuplicateSubscription, db.ErrDuplicateLike} 15 + 16 + func isSentinel(err error) bool { 17 + for _, s := range sentinelErrors { 18 + if errors.Is(err, s) { 19 + return true 20 + } 21 + } 22 + return false 23 + } 24 + 14 25 type StreamDBHandler struct { 15 26 articles *db.DB 16 27 users *db.DB ··· 50 61 return nil 51 62 } 52 63 53 - existing, err := h.articles.GetSubscription(ctx, event.DID, rec.FeedURL) 54 - if err == nil && existing != nil { 55 - if !existing.URI.Valid || existing.URI.String == "" { 56 - return h.articles.UpdateSubscriptionURI(ctx, event.DID, rec.FeedURL, event.URI, event.CID) 57 - } 58 - return nil 59 - } 60 - 61 - f := &db.Feed{FeedURL: rec.FeedURL, Title: db.NullStr(rec.Title)} 62 - _ = h.articles.UpsertFeed(ctx, f) 63 - err = h.articles.CreateSubscription(ctx, event.DID, rec.FeedURL, rec.Title, rec.Category, event.URI, event.CID) 64 - if errors.Is(err, db.ErrDuplicateSubscription) { 64 + _ = h.articles.UpsertFeed(ctx, &db.Feed{FeedURL: rec.FeedURL, Title: db.NullStr(rec.Title)}) 65 + err := h.articles.CreateSubscription(ctx, event.DID, rec.FeedURL, rec.Title, rec.Category, event.URI, event.CID) 66 + if isSentinel(err) { 65 67 return nil 66 68 } 67 69 return err ··· 91 93 return nil 92 94 } 93 95 94 - exists, err := h.articles.HasLiked(ctx, event.DID, rec.FeedURL, rec.ArticleURL) 95 - if err != nil || exists { 96 - return nil 97 - } 98 - 99 96 t, _ := time.Parse(time.RFC3339, rec.CreatedAt) 100 - err = h.articles.CreateLike(ctx, &db.Like{ 97 + err := h.articles.CreateLike(ctx, &db.Like{ 101 98 URI: event.URI, 102 99 AuthorDID: event.DID, 103 100 FeedURL: rec.FeedURL, ··· 105 102 CreatedAt: sql.NullTime{Time: t, Valid: true}, 106 103 CID: sql.NullString{String: event.CID, Valid: event.CID != ""}, 107 104 }) 108 - if errors.Is(err, db.ErrDuplicateLike) { 105 + if isSentinel(err) { 109 106 return nil 110 107 } 111 108 return err ··· 213 210 return nil 214 211 } 215 212 216 - existing, err := h.articles.GetSubscription(ctx, event.DID, rec.FeedURL) 217 - if err == nil && existing != nil { 218 - if !existing.URI.Valid || existing.URI.String == "" { 219 - return h.articles.UpdateSubscriptionURI(ctx, event.DID, rec.FeedURL, event.URI, event.CID) 220 - } 221 - return nil 222 - } 223 - 224 - f := &db.Feed{ 225 - FeedURL: rec.FeedURL, 226 - Title: db.NullStr(rec.Title), 227 - SiteURL: db.NullStr(rec.SiteURL), 228 - } 229 - _ = h.articles.UpsertFeed(ctx, f) 230 - err = h.articles.CreateSubscription(ctx, event.DID, rec.FeedURL, rec.Title, rec.Category, event.URI, event.CID) 231 - if errors.Is(err, db.ErrDuplicateSubscription) { 213 + _ = h.articles.UpsertFeed(ctx, &db.Feed{FeedURL: rec.FeedURL, Title: db.NullStr(rec.Title), SiteURL: db.NullStr(rec.SiteURL)}) 214 + err := h.articles.CreateSubscription(ctx, event.DID, rec.FeedURL, rec.Title, "", event.URI, event.CID) 215 + if isSentinel(err) { 232 216 return nil 233 217 } 234 218 return err
+30 -35
internal/atproto/sync.go
··· 10 10 import ( 11 11 "context" 12 12 "encoding/json" 13 + "fmt" 13 14 "log/slog" 14 - "sync" 15 15 "time" 16 + 17 + "golang.org/x/sync/errgroup" 16 18 17 19 "pkg.rbrt.fr/glean/internal/db" 18 20 ) ··· 96 98 } 97 99 98 100 if len(feeds) > 0 { 99 - _ = s.articles.BatchUpsertFeeds(ctx, feeds) 101 + if err := s.articles.BatchUpsertFeeds(ctx, feeds); err != nil { 102 + return fmt.Errorf("upsert feeds: %w", err) 103 + } 100 104 } 101 105 return s.articles.BatchReconcileSubscriptions(ctx, userDID, subs) 102 106 } ··· 123 127 } 124 128 125 129 if len(feeds) > 0 { 126 - _ = s.articles.BatchUpsertFeeds(ctx, feeds) 130 + if err := s.articles.BatchUpsertFeeds(ctx, feeds); err != nil { 131 + return fmt.Errorf("failed to upsert feeds: %w", err) 132 + } 127 133 } 134 + 128 135 return s.articles.BatchReconcileSubscriptions(ctx, userDID, subs) 129 136 } 130 137 ··· 259 266 return nil 260 267 } 261 268 262 - type profileResult struct { 263 - did string 264 - handle string 265 - displayName string 266 - avatarURL string 269 + g, gCtx := errgroup.WithContext(ctx) 270 + g.SetLimit(10) 271 + 272 + dids := make([]string, 0, len(activeFollows)) 273 + profiles := make([]db.UserData, len(activeFollows)) 274 + for did := range activeFollows { 275 + dids = append(dids, did) 267 276 } 268 277 269 - var wg sync.WaitGroup 270 - sem := make(chan struct{}, 10) 271 - results := make([]profileResult, 0, len(activeFollows)) 272 - var mu sync.Mutex 273 - 274 - for targetDID := range activeFollows { 275 - sem <- struct{}{} 276 - wg.Add(1) 277 - go func(did string) { 278 - defer func() { <-sem }() 279 - defer wg.Done() 280 - var handle, displayName, avatarURL string 281 - if h, dn, avatar, err := FetchProfile(ctx, did); err == nil { 282 - handle = h 283 - displayName = dn 284 - avatarURL = avatar 278 + for i, did := range dids { 279 + g.Go(func() error { 280 + if h, dn, avatar, err := FetchProfile(gCtx, did); err == nil { 281 + profiles[i] = db.UserData{DID: did, Handle: h, DisplayName: dn, AvatarURL: avatar} 282 + } else { 283 + profiles[i] = db.UserData{DID: did} 285 284 } 286 - mu.Lock() 287 - results = append(results, profileResult{did, handle, displayName, avatarURL}) 288 - mu.Unlock() 289 - }(targetDID) 285 + return nil 286 + }) 290 287 } 291 - wg.Wait() 292 - 293 - var users []db.UserData 294 - for _, r := range results { 295 - users = append(users, db.UserData{DID: r.did, Handle: r.handle, DisplayName: r.displayName, AvatarURL: r.avatarURL}) 288 + if err := g.Wait(); err != nil { 289 + return fmt.Errorf("fetch profiles: %w", err) 296 290 } 297 - _ = s.users.BatchCreateUsers(ctx, users) 298 - 291 + if err := s.users.BatchCreateUsers(ctx, profiles); err != nil { 292 + return fmt.Errorf("batch create users: %w", err) 293 + } 299 294 return s.users.SyncFollows(ctx, userDID, activeFollows) 300 295 }
+6 -13
internal/db/feed.go
··· 114 114 return err 115 115 } 116 116 117 - func (db *DB) IncrementSubscriberCount(ctx context.Context, feedURL string) error { 118 - _, err := db.ExecContext(ctx, ` 119 - UPDATE feeds SET subscriber_count = subscriber_count + 1 WHERE feed_url = ? 120 - `, feedURL) 121 - return err 122 - } 123 - 124 - func (db *DB) DecrementSubscriberCount(ctx context.Context, feedURL string) error { 117 + func (db *DB) decrementSubscriberCount(ctx context.Context, feedURL string) error { 125 118 _, err := db.ExecContext(ctx, ` 126 119 UPDATE feeds SET subscriber_count = MAX(subscriber_count - 1, 0) WHERE feed_url = ? 127 120 `, feedURL) ··· 132 125 existing, err := db.GetSubscription(ctx, userDID, feedURL) 133 126 if err == nil && existing != nil { 134 127 if !existing.URI.Valid || existing.URI.String == "" { 135 - return db.UpdateSubscriptionURI(ctx, userDID, feedURL, uri, cid) 128 + return db.updateSubscriptionURI(ctx, userDID, feedURL, uri, cid) 136 129 } 137 130 return ErrDuplicateSubscription 138 131 } 139 132 return db.BatchReconcileSubscriptions(ctx, userDID, []SubData{{FeedURL: feedURL, Title: title, Category: category, URI: uri, CID: cid}}) 140 133 } 141 134 142 - func (db *DB) UpdateSubscriptionURI(ctx context.Context, userDID, feedURL, uri, cid string) error { 135 + func (db *DB) updateSubscriptionURI(ctx context.Context, userDID, feedURL, uri, cid string) error { 143 136 _, err := db.ExecContext(ctx, ` 144 137 UPDATE subscriptions SET uri = ?, cid = ? WHERE user_did = ? AND feed_url = ? 145 138 `, uri, cid, userDID, feedURL) 146 139 return err 147 140 } 148 141 149 - func uriOrNil(category, v string) any { 142 + func uriOrNil(v string) any { 150 143 if v == "" { 151 144 return nil 152 145 } ··· 167 160 if err != nil { 168 161 return err 169 162 } 170 - return db.DecrementSubscriberCount(ctx, feedURL) 163 + return db.decrementSubscriberCount(ctx, feedURL) 171 164 } 172 165 173 166 func (db *DB) DeleteAllSubscriptions(ctx context.Context, userDID string) error { ··· 466 459 } 467 460 continue 468 461 } 469 - result, err := insertStmt.ExecContext(ctx, userDID, sub.FeedURL, nilIfEmpty(sub.Title), sub.Category, uriOrNil(sub.Category, sub.URI), uriOrNil(sub.Category, sub.CID)) 462 + result, err := insertStmt.ExecContext(ctx, userDID, sub.FeedURL, nilIfEmpty(sub.Title), sub.Category, uriOrNil(sub.URI), uriOrNil(sub.CID)) 470 463 if err != nil { 471 464 return err 472 465 }
+1 -1
internal/db/follow.go
··· 21 21 ON CONFLICT(user_did, target_did) DO UPDATE SET 22 22 uri = excluded.uri, 23 23 cid = excluded.cid 24 - `, userDID, targetDID, uriOrNil("", uri), uriOrNil("", cid)) 24 + `, userDID, targetDID, uriOrNil(uri), uriOrNil(cid)) 25 25 return err 26 26 } 27 27