this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

pds: remove FeedGenerator

-439
-431
pds/feedgen.go
··· 1 - package pds 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "log/slog" 7 - "sort" 8 - "strings" 9 - "time" 10 - 11 - bsky "github.com/bluesky-social/indigo/api/bsky" 12 - "github.com/bluesky-social/indigo/indexer" 13 - lexutil "github.com/bluesky-social/indigo/lex/util" 14 - "github.com/bluesky-social/indigo/models" 15 - 16 - "github.com/ipfs/go-cid" 17 - "go.opentelemetry.io/otel" 18 - "gorm.io/gorm" 19 - ) 20 - 21 - type FeedGenerator struct { 22 - db *gorm.DB 23 - ix *indexer.Indexer 24 - 25 - readRecord ReadRecordFunc 26 - 27 - log *slog.Logger 28 - } 29 - 30 - func NewFeedGenerator(db *gorm.DB, ix *indexer.Indexer, readRecord ReadRecordFunc, log *slog.Logger) (*FeedGenerator, error) { 31 - return &FeedGenerator{ 32 - db: db, 33 - ix: ix, 34 - readRecord: readRecord, 35 - log: log, 36 - }, nil 37 - } 38 - 39 - type ReadRecordFunc func(context.Context, models.Uid, cid.Cid) (lexutil.CBOR, error) 40 - 41 - /* 42 - type HydratedFeedItem struct { 43 - Uri string 44 - RepostedBy *bsky.ActorDefs_ProfileViewBasic 45 - Record any 46 - ReplyCount int64 47 - RepostCount int64 48 - UpvoteCount int64 49 - DownvoteCount int64 50 - MyState *bsky.FeedGetAuthorFeed_MyState 51 - Cid string 52 - Author *bsky.ActorDefs_ProfileViewBasic 53 - TrendedBy *bsky.ActorDefs_ProfileViewBasic 54 - Embed *bsky.FeedEmbed 55 - IndexedAt string 56 - } 57 - */ 58 - 59 - func (fg *FeedGenerator) hydrateFeed(ctx context.Context, items []*models.FeedPost, reposts []*models.RepostRecord) ([]*bsky.FeedDefs_FeedViewPost, error) { 60 - out := make([]*bsky.FeedDefs_FeedViewPost, 0, len(items)) 61 - for _, it := range items { 62 - hit, err := fg.hydrateItem(ctx, it) 63 - if err != nil { 64 - return nil, err 65 - } 66 - 67 - out = append(out, hit) 68 - } 69 - 70 - if len(reposts) > 0 { 71 - for _, rp := range reposts { 72 - var fp models.FeedPost 73 - if err := fg.db.First(&fp, "id = ?", rp.Post).Error; err != nil { 74 - return nil, err 75 - } 76 - 77 - fvp, err := fg.hydrateItem(ctx, &fp) 78 - if err != nil { 79 - return nil, err 80 - } 81 - 82 - reposter, err := fg.getActorRefInfo(ctx, rp.Reposter) 83 - if err != nil { 84 - return nil, err 85 - } 86 - 87 - fvp.Reason = &bsky.FeedDefs_FeedViewPost_Reason{ 88 - FeedDefs_ReasonRepost: &bsky.FeedDefs_ReasonRepost{ 89 - By: reposter, 90 - IndexedAt: rp.CreatedAt.Format(time.RFC3339), 91 - }, 92 - } 93 - 94 - out = append(out, fvp) 95 - } 96 - } 97 - 98 - return out, nil 99 - } 100 - 101 - func (fg *FeedGenerator) didForUser(ctx context.Context, user models.Uid) (string, error) { 102 - // TODO: cache the shit out of this 103 - var ai models.ActorInfo 104 - if err := fg.db.First(&ai, "uid = ?", user).Error; err != nil { 105 - return "", err 106 - } 107 - 108 - return ai.Did, nil 109 - } 110 - 111 - func (fg *FeedGenerator) getActorRefInfo(ctx context.Context, user models.Uid) (*bsky.ActorDefs_ProfileViewBasic, error) { 112 - // TODO: cache the shit out of this too 113 - var ai models.ActorInfo 114 - if err := fg.db.First(&ai, "uid = ?", user).Error; err != nil { 115 - return nil, err 116 - } 117 - 118 - return ai.ActorRef(), nil 119 - } 120 - 121 - func (fg *FeedGenerator) hydrateItem(ctx context.Context, item *models.FeedPost) (*bsky.FeedDefs_FeedViewPost, error) { 122 - authorDid, err := fg.didForUser(ctx, item.Author) 123 - if err != nil { 124 - return nil, err 125 - } 126 - 127 - out := &bsky.FeedDefs_FeedViewPost{} 128 - 129 - out.Post = &bsky.FeedDefs_PostView{ 130 - Uri: "at://" + authorDid + "/app.bsky.feed.post/" + item.Rkey, 131 - ReplyCount: &item.ReplyCount, 132 - RepostCount: &item.RepostCount, 133 - LikeCount: &item.UpCount, 134 - Cid: item.Cid, 135 - IndexedAt: item.UpdatedAt.Format(time.RFC3339), 136 - } 137 - 138 - author, err := fg.getActorRefInfo(ctx, item.Author) 139 - if err != nil { 140 - return nil, err 141 - } 142 - 143 - out.Post.Author = author 144 - 145 - reccid, err := cid.Decode(item.Cid) 146 - if err != nil { 147 - return nil, err 148 - } 149 - 150 - rec, err := fg.readRecord(ctx, item.Author, reccid) 151 - if err != nil { 152 - return nil, err 153 - } 154 - 155 - out.Post.Record = &lexutil.LexiconTypeDecoder{Val: rec} 156 - 157 - return out, nil 158 - } 159 - 160 - func (fg *FeedGenerator) getPostViewerState(ctx context.Context, item uint, viewer models.Uid, viewerDid string) (*bsky.FeedDefs_ViewerState, error) { 161 - var out bsky.FeedDefs_ViewerState 162 - 163 - var vote models.VoteRecord 164 - if err := fg.db.Find(&vote, "post = ? AND voter = ?", item, viewer).Error; err != nil { 165 - return nil, err 166 - } 167 - 168 - if vote.ID != 0 { 169 - vuri := fmt.Sprintf("at://%s/app.bsky.feed.vote/%s", viewerDid, vote.Rkey) 170 - out.Like = &vuri 171 - } 172 - 173 - var rep models.RepostRecord 174 - if err := fg.db.Find(&rep, "post = ? AND reposter = ?", item, viewer).Error; err != nil { 175 - return nil, err 176 - } 177 - 178 - if rep.ID != 0 { 179 - rpuri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", viewerDid, rep.Rkey) 180 - out.Repost = &rpuri 181 - } 182 - 183 - return &out, nil 184 - } 185 - 186 - func (fg *FeedGenerator) GetTimeline(ctx context.Context, user *User, algo string, before string, limit int) ([]*bsky.FeedDefs_FeedViewPost, error) { 187 - ctx, span := otel.Tracer("feedgen").Start(context.Background(), "GetTimeline") 188 - defer span.End() 189 - 190 - // TODO: this query is just a temporary hack... 191 - var feed []*models.FeedPost 192 - if err := fg.db.Debug().Find(&feed, "author in (?)", 193 - fg.db.Model(models.FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 194 - ).Error; err != nil { 195 - return nil, err 196 - } 197 - 198 - var rps []*models.RepostRecord 199 - if err := fg.db.Debug().Find(&rps, "reposter in (?)", 200 - fg.db.Model(models.FollowRecord{}).Where("follower = ?", user.ID).Select("target"), 201 - ).Error; err != nil { 202 - return nil, err 203 - } 204 - 205 - fout, err := fg.hydrateFeed(ctx, feed, rps) 206 - if err != nil { 207 - return nil, fmt.Errorf("hydrating feed: %w", err) 208 - } 209 - 210 - sort.Slice(fout, func(i, j int) bool { 211 - ti, _ := time.Parse(time.RFC3339, fout[i].Post.IndexedAt) 212 - tj, _ := time.Parse(time.RFC3339, fout[j].Post.IndexedAt) 213 - 214 - return tj.Before(ti) 215 - }) 216 - 217 - return fg.personalizeFeed(ctx, fout, user) 218 - } 219 - 220 - func (fg *FeedGenerator) personalizeFeed(ctx context.Context, feed []*bsky.FeedDefs_FeedViewPost, viewer *User) ([]*bsky.FeedDefs_FeedViewPost, error) { 221 - for _, p := range feed { 222 - 223 - // TODO: its inefficient to have to call 'GetPost' again here when we could instead be doing that inside the 'hydrateFeed' call earlier. 224 - // However, if we introduce per-user information into hydrateFeed, then 225 - // it cannot be effectively cached, so we separate the 'cacheable' 226 - // portion of feed generation from the 'per user' portion. An 227 - // optimization could be to hide the internal post IDs in the Post 228 - // structs for internal use (stripped out before sending to client) 229 - item, err := fg.ix.GetPost(ctx, p.Post.Uri) 230 - if err != nil { 231 - return nil, err 232 - } 233 - 234 - vs, err := fg.getPostViewerState(ctx, item.ID, viewer.ID, viewer.Did) 235 - if err != nil { 236 - return nil, fmt.Errorf("getting viewer state: %w", err) 237 - } 238 - 239 - p.Post.Viewer = vs 240 - } 241 - 242 - return feed, nil 243 - } 244 - 245 - func (fg *FeedGenerator) GetAuthorFeed(ctx context.Context, user *User, before string, limit int) ([]*bsky.FeedDefs_FeedViewPost, error) { 246 - ctx, span := otel.Tracer("feedgen").Start(context.Background(), "GetAuthorFeed") 247 - defer span.End() 248 - 249 - // for memory efficiency, should probably return the actual type that goes out to the user... 250 - // bsky.FeedGetAuthorFeed_FeedItem 251 - 252 - var feed []*models.FeedPost 253 - if err := fg.db.Find(&feed, "author = ?", user.ID).Error; err != nil { 254 - return nil, err 255 - } 256 - 257 - var reposts []*models.RepostRecord 258 - if err := fg.db.Find(&reposts, "reposter = ?", user.ID).Error; err != nil { 259 - return nil, err 260 - } 261 - 262 - fout, err := fg.hydrateFeed(ctx, feed, reposts) 263 - if err != nil { 264 - return nil, fmt.Errorf("hydrating feed: %w", err) 265 - } 266 - 267 - return fg.personalizeFeed(ctx, fout, user) 268 - } 269 - 270 - func (fg *FeedGenerator) GetActorProfileByID(ctx context.Context, actor uint) (*models.ActorInfo, error) { 271 - var ai models.ActorInfo 272 - if err := fg.db.First(&ai, "id = ?", actor).Error; err != nil { 273 - return nil, fmt.Errorf("getActorProfileByID: %w", err) 274 - } 275 - 276 - return &ai, nil 277 - } 278 - func (fg *FeedGenerator) GetActorProfile(ctx context.Context, actor string) (*models.ActorInfo, error) { 279 - fmt.Println("get actor profile: ", actor) 280 - var ai models.ActorInfo 281 - if strings.HasPrefix(actor, "did:") { 282 - if err := fg.db.First(&ai, "did = ?", actor).Error; err != nil { 283 - return nil, err 284 - } 285 - } else { 286 - if err := fg.db.First(&ai, "handle = ?", actor).Error; err != nil { 287 - return nil, err 288 - } 289 - } 290 - 291 - return &ai, nil 292 - } 293 - 294 - type ThreadPost struct { 295 - Post *bsky.FeedDefs_FeedViewPost 296 - PostID uint 297 - 298 - ParentUri string 299 - Parent *ThreadPost 300 - } 301 - 302 - func (fg *FeedGenerator) GetPostThread(ctx context.Context, uri string, depth int) (*ThreadPost, error) { 303 - post, err := fg.ix.GetPost(ctx, uri) 304 - if err != nil { 305 - return nil, fmt.Errorf("getting post for thread: %w", err) 306 - } 307 - 308 - hi, err := fg.hydrateItem(ctx, post) 309 - if err != nil { 310 - return nil, err 311 - } 312 - 313 - p, ok := hi.Post.Record.Val.(*bsky.FeedPost) 314 - if !ok { 315 - return nil, fmt.Errorf("getPostThread can only operate on app.bsky.feed.post records") 316 - } 317 - 318 - out := &ThreadPost{ 319 - Post: hi, 320 - PostID: post.ID, 321 - } 322 - 323 - if p.Reply != nil { 324 - out.ParentUri = p.Reply.Parent.Uri 325 - if depth > 0 { 326 - 327 - parent, err := fg.GetPostThread(ctx, p.Reply.Parent.Uri, depth-1) 328 - if err != nil { 329 - // TODO: check for and handle 'not found' 330 - return nil, err 331 - } 332 - out.Parent = parent 333 - } 334 - } 335 - 336 - return out, nil 337 - } 338 - 339 - type HydratedVote struct { 340 - Actor *bsky.ActorDefs_ProfileViewBasic 341 - Direction string 342 - IndexedAt time.Time 343 - CreatedAt string 344 - } 345 - 346 - func (fg *FeedGenerator) hydrateVote(ctx context.Context, v *models.VoteRecord) (*HydratedVote, error) { 347 - aref, err := fg.getActorRefInfo(ctx, v.Voter) 348 - if err != nil { 349 - return nil, err 350 - } 351 - 352 - return &HydratedVote{ 353 - Actor: aref, 354 - Direction: v.Dir.String(), 355 - IndexedAt: v.UpdatedAt, 356 - CreatedAt: v.Created, 357 - }, nil 358 - } 359 - 360 - func (fg *FeedGenerator) GetVotes(ctx context.Context, uri string, pcid cid.Cid, limit int, before string) ([]*HydratedVote, error) { 361 - if before != "" { 362 - fg.log.Warn("not respecting 'before' yet") 363 - } 364 - 365 - p, err := fg.ix.GetPost(ctx, uri) 366 - if err != nil { 367 - return nil, err 368 - } 369 - 370 - if p.Cid != pcid.String() { 371 - return nil, fmt.Errorf("listing likes of old post versions not supported") 372 - } 373 - 374 - var voterecs []models.VoteRecord 375 - if err := fg.db.Limit(limit).Find(&voterecs, "post = ?", p.ID).Error; err != nil { 376 - return nil, err 377 - } 378 - 379 - var out []*HydratedVote 380 - for _, vr := range voterecs { 381 - hv, err := fg.hydrateVote(ctx, &vr) 382 - if err != nil { 383 - return nil, err 384 - } 385 - out = append(out, hv) 386 - } 387 - 388 - return out, nil 389 - } 390 - 391 - type FollowInfo struct { 392 - Follower *bsky.ActorDefs_ProfileViewBasic 393 - Subject *bsky.ActorDefs_ProfileViewBasic 394 - CreatedAt string 395 - IndexedAt string 396 - } 397 - 398 - func (fg *FeedGenerator) GetFollows(ctx context.Context, user string, limit int, before string) ([]*FollowInfo, error) { 399 - var follows []models.FollowRecord 400 - if err := fg.db.Limit(limit).Find(&follows, "follower = (?)", fg.db.Model(models.ActorInfo{}).Where("did = ? or handle = ?", user, user).Select("uid")).Error; err != nil { 401 - return nil, err 402 - } 403 - 404 - profile, err := fg.GetActorProfile(ctx, user) 405 - if err != nil { 406 - return nil, err 407 - } 408 - 409 - ai, err := fg.getActorRefInfo(ctx, profile.Uid) 410 - if err != nil { 411 - return nil, err 412 - } 413 - 414 - out := []*FollowInfo{} 415 - for _, f := range follows { 416 - fai, err := fg.getActorRefInfo(ctx, f.Target) 417 - if err != nil { 418 - return nil, err 419 - } 420 - 421 - out = append(out, &FollowInfo{ 422 - Follower: ai, 423 - Subject: fai, 424 - CreatedAt: f.CreatedAt.Format(time.RFC3339), 425 - IndexedAt: f.CreatedAt.Format(time.RFC3339), 426 - }) 427 - 428 - } 429 - 430 - return nil, nil 431 - }
-8
pds/server.go
··· 40 40 db *gorm.DB 41 41 cs carstore.CarStore 42 42 repoman *repomgr.RepoManager 43 - feedgen *FeedGenerator 44 43 indexer *indexer.Indexer 45 44 events *events.EventManager 46 45 signingKey *did.PrivKey ··· 104 103 105 104 //ix.SendRemoteFollow = s.sendRemoteFollow 106 105 ix.CreateExternalUser = s.createExternalUser 107 - 108 - feedgen, err := NewFeedGenerator(db, ix, s.readRecordFunc, s.log) 109 - if err != nil { 110 - return nil, err 111 - } 112 - 113 - s.feedgen = feedgen 114 106 115 107 return s, nil 116 108 }