this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor to make it replies and bookmarks and then added a new feed type (#10)

authored by

Will Andrews and committed by
GitHub
47707948 c3e4aac5

+202 -121
+5 -4
bookmark_handler.go
··· 9 9 "net/http" 10 10 "net/url" 11 11 "strings" 12 + "time" 12 13 13 14 "github.com/bluesky-social/indigo/api/bsky" 14 15 apibsky "github.com/bluesky-social/indigo/api/bsky" ··· 75 76 content = fmt.Sprintf("%s...", content[:75]) 76 77 } 77 78 78 - err = s.bookmarkStore.CreateBookmark(rkey, postURI, atPostURI, post.Author.Did, post.Author.Handle, usersDid, content) 79 + err = s.bookmarkStore.CreateBookmark(rkey, postURI, atPostURI, post.Author.Did, post.Author.Handle, usersDid, content, time.Now().UnixMilli()) 79 80 if err != nil { 80 81 if errors.Is(err, store.ErrBookmarkAlreadyExists) { 81 82 return ··· 131 132 return 132 133 } 133 134 134 - err = s.bookmarkStore.DeleteFeedPostsForBookmarkedPostURIandUserDID(bookmark.PostATURI, usersDid) 135 + err = s.bookmarkStore.DeleteRepliedPostsForBookmarkedPostURIandUserDID(bookmark.PostATURI, usersDid) 135 136 if err != nil { 136 - slog.Error("deleting feed items for bookmark", "error", err) 137 - http.Error(w, "deleting feed items for bookmark", http.StatusInternalServerError) 137 + slog.Error("deleting replied posts for bookmark", "error", err) 138 + http.Error(w, "deleting replied posts for bookmark", http.StatusInternalServerError) 138 139 return 139 140 } 140 141
+3 -3
dm_handler.go
··· 243 243 244 244 rkey := getRKeyFromATURI(msg.Embed.Record.URI) 245 245 246 - err := d.bookmarkStore.CreateBookmark(rkey, publicURI, msg.Embed.Record.URI, msg.Embed.Record.Author.Did, msg.Embed.Record.Author.Handle, msg.Sender.Did, content) 246 + err := d.bookmarkStore.CreateBookmark(rkey, publicURI, msg.Embed.Record.URI, msg.Embed.Record.Author.Did, msg.Embed.Record.Author.Handle, msg.Sender.Did, content, time.Now().UnixMilli()) 247 247 if err != nil { 248 248 return fmt.Errorf("creating bookmark: %w", err) 249 249 } ··· 253 253 func (d *DmService) handleDeleteBookmark(msg Message) error { 254 254 rkey := getRKeyFromATURI(msg.Embed.Record.URI) 255 255 256 - err := d.bookmarkStore.DeleteFeedPostsForBookmarkedPostURIandUserDID(msg.Embed.Record.URI, msg.Sender.Did) 256 + err := d.bookmarkStore.DeleteRepliedPostsForBookmarkedPostURIandUserDID(msg.Embed.Record.URI, msg.Sender.Did) 257 257 if err != nil { 258 - return fmt.Errorf("failed to delete feed posts of replies to bookmark for user: %w", err) 258 + return fmt.Errorf("failed to delete replied posts for bookmark for user: %w", err) 259 259 } 260 260 261 261 err = d.bookmarkStore.DeleteBookmark(rkey, msg.Sender.Did)
+4 -1
feed_handlers.go
··· 104 104 DID: fmt.Sprintf("did:web:%s", s.feedHost), 105 105 Feeds: []FeedRespsonse{ 106 106 { 107 - URI: fmt.Sprintf("at://%s/app.bsky.feed.generator/wills-test", s.feedDidBase), 107 + URI: fmt.Sprintf("at://%s/app.bsky.feed.generator/bookmark-replies", s.feedDidBase), 108 + }, 109 + { 110 + URI: fmt.Sprintf("at://%s/app.bsky.feed.generator/bookmarks", s.feedDidBase), 108 111 }, 109 112 }, 110 113 }
+64 -11
feedgenerator.go
··· 5 5 "fmt" 6 6 "log/slog" 7 7 "strconv" 8 + "strings" 8 9 9 10 "github.com/willdot/bskyfeedgen/store" 10 11 ) 11 12 12 - type feedStore interface { 13 - GetUsersFeed(usersDID string, cursor int64, limit int) ([]store.FeedPost, error) 14 - AddFeedPost(feedPost store.FeedPost) error 13 + type repliesStore interface { 14 + GetUsersReplies(usersDID string, cursor int64, limit int) ([]store.ReplyPost, error) 15 + GetBookmarksForUserWithPaging(userDID string, cursor int64, limit int) ([]store.Bookmark, error) 16 + AddRepliedPost(replyPost store.ReplyPost) error 15 17 } 16 18 17 19 type FeedGenerator struct { 18 - store feedStore 20 + store repliesStore 19 21 } 20 22 21 - func NewFeedGenerator(store feedStore) *FeedGenerator { 23 + func NewFeedGenerator(store repliesStore) *FeedGenerator { 22 24 return &FeedGenerator{ 23 25 store: store, 24 26 } 25 27 } 26 28 27 29 func (f *FeedGenerator) GetFeed(ctx context.Context, userDID, feed, cursor string, limit int) (FeedReponse, error) { 30 + switch { 31 + case strings.Contains(feed, "bookmark-replies"): 32 + return f.getBookmarkRepliesFeed(ctx, userDID, cursor, limit) 33 + case strings.Contains(feed, "bookmarks"): 34 + return f.getBookmarksFeed(ctx, userDID, cursor, limit) 35 + 36 + default: 37 + return FeedReponse{ 38 + Feed: make([]FeedItem, 0), 39 + }, fmt.Errorf("invalid feed requested") 40 + } 41 + } 42 + 43 + func (f *FeedGenerator) getBookmarkRepliesFeed(ctx context.Context, userDID, cursor string, limit int) (FeedReponse, error) { 28 44 resp := FeedReponse{ 29 45 Feed: make([]FeedItem, 0), 30 46 } ··· 38 54 cursorInt = 9999999999999 39 55 } 40 56 41 - usersFeed, err := f.store.GetUsersFeed(userDID, int64(cursorInt), limit) 57 + usersReplies, err := f.store.GetUsersReplies(userDID, int64(cursorInt), limit) 42 58 if err != nil { 43 - return resp, fmt.Errorf("get users feed items from DB: %w", err) 59 + return resp, fmt.Errorf("get users replies from DB: %w", err) 44 60 } 45 61 46 - feedItems := make([]FeedItem, 0, len(usersFeed)) 47 - for _, post := range usersFeed { 62 + feedItems := make([]FeedItem, 0, len(usersReplies)) 63 + for _, post := range usersReplies { 48 64 feedItems = append(feedItems, FeedItem{ 49 65 Post: post.ReplyURI, 50 66 }) ··· 54 70 55 71 // only set the return cursor if there was a record returned and that the len of records 56 72 // being returned is the same as the limit 57 - if len(usersFeed) > 0 && len(usersFeed) == limit { 58 - lastFeedItem := usersFeed[len(usersFeed)-1] 73 + if len(usersReplies) > 0 && len(usersReplies) == limit { 74 + lastFeedItem := usersReplies[len(usersReplies)-1] 75 + resp.Cursor = fmt.Sprintf("%d", lastFeedItem.CreatedAt) 76 + } 77 + return resp, nil 78 + } 79 + 80 + func (f *FeedGenerator) getBookmarksFeed(ctx context.Context, userDID, cursor string, limit int) (FeedReponse, error) { 81 + resp := FeedReponse{ 82 + Feed: make([]FeedItem, 0), 83 + } 84 + 85 + cursorInt, err := strconv.Atoi(cursor) 86 + if err != nil && cursor != "" { 87 + slog.Error("convert cursor to int", "error", err, "cursor value", cursor) 88 + } 89 + if cursorInt == 0 { 90 + // if no cursor provided use a date waaaaay in the future to start the less than query 91 + cursorInt = 9999999999999 92 + } 93 + 94 + usersBookmarks, err := f.store.GetBookmarksForUserWithPaging(userDID, int64(cursorInt), limit) 95 + if err != nil { 96 + return resp, fmt.Errorf("get users bookmarks from DB: %w", err) 97 + } 98 + 99 + feedItems := make([]FeedItem, 0, len(usersBookmarks)) 100 + for _, bookmark := range usersBookmarks { 101 + feedItems = append(feedItems, FeedItem{ 102 + Post: bookmark.PostATURI, 103 + }) 104 + } 105 + 106 + resp.Feed = feedItems 107 + 108 + // only set the return cursor if there was a record returned and that the len of records 109 + // being returned is the same as the limit 110 + if len(usersBookmarks) > 0 && len(usersBookmarks) == limit { 111 + lastFeedItem := usersBookmarks[len(usersBookmarks)-1] 59 112 resp.Cursor = fmt.Sprintf("%d", lastFeedItem.CreatedAt) 60 113 } 61 114 return resp, nil
+6 -6
firehose_handler.go
··· 14 14 ) 15 15 16 16 type HandlerStore interface { 17 - AddFeedPost(feedItem store.FeedPost) error 17 + AddRepliedPost(replyPost store.ReplyPost) error 18 18 GetBookmarksForPost(postURI string) ([]string, error) 19 19 } 20 20 ··· 68 68 } 69 69 70 70 replyPostURI := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey) 71 - h.createFeedPostForSubscribedUsers(subscribedDids, replyPostURI, subscribedPostURI, createdAt.UnixMilli()) 71 + h.createReplyPostForSubscribedUsers(subscribedDids, replyPostURI, subscribedPostURI, createdAt.UnixMilli()) 72 72 return nil 73 73 } 74 74 ··· 83 83 return dids 84 84 } 85 85 86 - func (h *handler) createFeedPostForSubscribedUsers(usersDids []string, replyPostURI, subscribedPostURI string, createdAt int64) { 86 + func (h *handler) createReplyPostForSubscribedUsers(usersDids []string, replyPostURI, subscribedPostURI string, createdAt int64) { 87 87 for _, did := range usersDids { 88 - feedItem := store.FeedPost{ 88 + repliedPost := store.ReplyPost{ 89 89 ReplyURI: replyPostURI, 90 90 UserDID: did, 91 91 SubscribedPostURI: subscribedPostURI, 92 92 CreatedAt: createdAt, 93 93 } 94 - err := h.store.AddFeedPost(feedItem) 94 + err := h.store.AddRepliedPost(repliedPost) 95 95 if err != nil { 96 - slog.Error("add users feed item", "error", err, "did", did, "reply post URI", replyPostURI) 96 + slog.Error("add users replied post", "error", err, "did", did, "reply post URI", replyPostURI) 97 97 _ = bugsnag.Notify(err) 98 98 continue 99 99 }
+2 -2
server.go
··· 28 28 } 29 29 30 30 type BookmarkStore interface { 31 - CreateBookmark(postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content string) error 31 + CreateBookmark(postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content string, createdAt int64) error 32 32 GetBookmarksForUser(userDID string) ([]store.Bookmark, error) 33 33 DeleteBookmark(postRKey, userDID string) error 34 34 GetBookmarkByRKeyForUser(rkey, userDID string) (*store.Bookmark, error) 35 - DeleteFeedPostsForBookmarkedPostURIandUserDID(subscribedPostURI, userDID string) error 35 + DeleteRepliedPostsForBookmarkedPostURIandUserDID(subscribedPostURI, userDID string) error 36 36 } 37 37 38 38 type OauthRequestStore interface {
+29 -5
store/bookmark.go
··· 19 19 "authorHandle" TEXT, 20 20 "userDID" TEXT, 21 21 "content" TEXT, 22 + "createdAt" integer NOT NULL, 22 23 UNIQUE(postRKey, userDID) 23 24 );` 24 25 ··· 45 46 AuthorHandle string 46 47 UserDID string 47 48 Content string 49 + CreatedAt int64 48 50 } 49 51 50 - func (s *Store) CreateBookmark(postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content string) error { 51 - sql := `INSERT INTO bookmarks (postRKey, postURI,postATURI, authorDID, authorHandle, userDID, content) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(postRKey, userDID) DO NOTHING;` 52 - res, err := s.db.Exec(sql, postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content) 52 + func (s *Store) CreateBookmark(postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content string, createdAt int64) error { 53 + sql := `INSERT INTO bookmarks (postRKey, postURI,postATURI, authorDID, authorHandle, userDID, content, createdAt) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(postRKey, userDID) DO NOTHING;` 54 + res, err := s.db.Exec(sql, postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content, createdAt) 53 55 if err != nil { 54 56 return fmt.Errorf("exec insert bookmark: %w", err) 55 57 } ··· 61 63 } 62 64 63 65 func (s *Store) GetBookmarksForUser(userDID string) ([]Bookmark, error) { 64 - sql := "SELECT id, postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content FROM bookmarks WHERE userDID = ?;" 66 + sql := "SELECT id, postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content, createdAt FROM bookmarks WHERE userDID = ?;" 65 67 rows, err := s.db.Query(sql, userDID) 66 68 if err != nil { 67 69 return nil, fmt.Errorf("run query to get bookmarked posts for user: %w", err) ··· 71 73 var results []Bookmark 72 74 for rows.Next() { 73 75 var bookmark Bookmark 74 - if err := rows.Scan(&bookmark.ID, &bookmark.PostRKey, &bookmark.PostURI, &bookmark.PostATURI, &bookmark.AuthorDID, &bookmark.AuthorHandle, &bookmark.UserDID, &bookmark.Content); err != nil { 76 + if err := rows.Scan(&bookmark.ID, &bookmark.PostRKey, &bookmark.PostURI, &bookmark.PostATURI, &bookmark.AuthorDID, &bookmark.AuthorHandle, &bookmark.UserDID, &bookmark.Content, &bookmark.CreatedAt); err != nil { 77 + return nil, fmt.Errorf("scan row: %w", err) 78 + } 79 + 80 + results = append(results, bookmark) 81 + } 82 + return results, nil 83 + } 84 + 85 + func (s *Store) GetBookmarksForUserWithPaging(userDID string, cursor int64, limit int) ([]Bookmark, error) { 86 + sql := `SELECT id, postRKey, postURI, postATURI, authorDID, authorHandle, userDID, content, createdAt FROM bookmarks 87 + WHERE userDID = ? AND createdAt < ? 88 + ORDER BY createdAt DESC LIMIT ?;` 89 + rows, err := s.db.Query(sql, userDID, cursor, limit) 90 + if err != nil { 91 + return nil, fmt.Errorf("run query to get bookmarked posts for user: %w", err) 92 + } 93 + defer rows.Close() 94 + 95 + var results []Bookmark 96 + for rows.Next() { 97 + var bookmark Bookmark 98 + if err := rows.Scan(&bookmark.ID, &bookmark.PostRKey, &bookmark.PostURI, &bookmark.PostATURI, &bookmark.AuthorDID, &bookmark.AuthorHandle, &bookmark.UserDID, &bookmark.Content, &bookmark.CreatedAt); err != nil { 75 99 return nil, fmt.Errorf("scan row: %w", err) 76 100 } 77 101
+2 -2
store/database.go
··· 32 32 return nil, fmt.Errorf("ping db: %w", err) 33 33 } 34 34 35 - err = createFeedTable(db) 35 + err = createRepliesTable(db) 36 36 if err != nil { 37 - return nil, fmt.Errorf("creating feed table: %w", err) 37 + return nil, fmt.Errorf("creating replies table: %w", err) 38 38 } 39 39 40 40 err = createBookmarksTable(db)
-87
store/feed.go
··· 1 - package store 2 - 3 - import ( 4 - "database/sql" 5 - "fmt" 6 - "log/slog" 7 - ) 8 - 9 - func createFeedTable(db *sql.DB) error { 10 - createFeedTableSQL := `CREATE TABLE IF NOT EXISTS feed ( 11 - "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, 12 - "replyURI" TEXT, 13 - "userDID" TEXT, 14 - "subscribedPostURI" TEXT, 15 - "createdAt" integer NOT NULL, 16 - UNIQUE(replyURI, userDID) 17 - );` 18 - 19 - slog.Info("Create feed table...") 20 - statement, err := db.Prepare(createFeedTableSQL) 21 - if err != nil { 22 - return fmt.Errorf("prepare DB statement to create feeds table: %w", err) 23 - } 24 - _, err = statement.Exec() 25 - if err != nil { 26 - return fmt.Errorf("exec sql statement to create feeds table: %w", err) 27 - } 28 - slog.Info("feed table created") 29 - 30 - return nil 31 - } 32 - 33 - type FeedPost struct { 34 - ID int 35 - ReplyURI string 36 - UserDID string 37 - SubscribedPostURI string 38 - CreatedAt int64 39 - } 40 - 41 - func (s *Store) AddFeedPost(feedPost FeedPost) error { 42 - sql := `INSERT INTO feed (replyURI, userDID, subscribedPostURI, createdAt) VALUES (?, ?, ?, ?) ON CONFLICT(replyURI, userDID) DO NOTHING;` 43 - _, err := s.db.Exec(sql, feedPost.ReplyURI, feedPost.UserDID, feedPost.SubscribedPostURI, feedPost.CreatedAt) 44 - if err != nil { 45 - return fmt.Errorf("exec insert feed item: %w", err) 46 - } 47 - return nil 48 - } 49 - 50 - func (s *Store) GetUsersFeed(usersDID string, cursor int64, limit int) ([]FeedPost, error) { 51 - sql := `SELECT id, replyURI, userDID, subscribedPostURI, createdAt FROM feed 52 - WHERE userDID = ? AND createdAt < ? 53 - ORDER BY createdAt DESC LIMIT ?;` 54 - rows, err := s.db.Query(sql, usersDID, cursor, limit) 55 - if err != nil { 56 - return nil, fmt.Errorf("run query to get users feed posts: %w", err) 57 - } 58 - defer rows.Close() 59 - 60 - feedPosts := make([]FeedPost, 0) 61 - for rows.Next() { 62 - var feedPost FeedPost 63 - if err := rows.Scan(&feedPost.ID, &feedPost.ReplyURI, &feedPost.UserDID, &feedPost.SubscribedPostURI, &feedPost.CreatedAt); err != nil { 64 - return nil, fmt.Errorf("scan row: %w", err) 65 - } 66 - feedPosts = append(feedPosts, feedPost) 67 - } 68 - 69 - return feedPosts, nil 70 - } 71 - 72 - func (s *Store) DeleteFeedPostsForBookmarkedPostURIandUserDID(subscribedPostURI, userDID string) error { 73 - sql := "DELETE FROM feed WHERE subscribedPostURI = ? AND userDID = ?;" 74 - statement, err := s.db.Prepare(sql) 75 - if err != nil { 76 - return fmt.Errorf("prepare delete feed posts: %w", err) 77 - } 78 - res, err := statement.Exec(subscribedPostURI, userDID) 79 - if err != nil { 80 - return fmt.Errorf("exec delete feed posts: %w", err) 81 - } 82 - 83 - n, _ := res.RowsAffected() 84 - 85 - slog.Info("delete feed posts result", "affected rows", n) 86 - return nil 87 - }
+87
store/replies.go
··· 1 + package store 2 + 3 + import ( 4 + "database/sql" 5 + "fmt" 6 + "log/slog" 7 + ) 8 + 9 + func createRepliesTable(db *sql.DB) error { 10 + createRepliesTableSQL := `CREATE TABLE IF NOT EXISTS replies ( 11 + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, 12 + "replyURI" TEXT, 13 + "userDID" TEXT, 14 + "subscribedPostURI" TEXT, 15 + "createdAt" integer NOT NULL, 16 + UNIQUE(replyURI, userDID) 17 + );` 18 + 19 + slog.Info("Create replies table...") 20 + statement, err := db.Prepare(createRepliesTableSQL) 21 + if err != nil { 22 + return fmt.Errorf("prepare DB statement to create replies table: %w", err) 23 + } 24 + _, err = statement.Exec() 25 + if err != nil { 26 + return fmt.Errorf("exec sql statement to create replies table: %w", err) 27 + } 28 + slog.Info("replies table created") 29 + 30 + return nil 31 + } 32 + 33 + type ReplyPost struct { 34 + ID int 35 + ReplyURI string 36 + UserDID string 37 + SubscribedPostURI string 38 + CreatedAt int64 39 + } 40 + 41 + func (s *Store) AddRepliedPost(replyPost ReplyPost) error { 42 + sql := `INSERT INTO replies (replyURI, userDID, subscribedPostURI, createdAt) VALUES (?, ?, ?, ?) ON CONFLICT(replyURI, userDID) DO NOTHING;` 43 + _, err := s.db.Exec(sql, replyPost.ReplyURI, replyPost.UserDID, replyPost.SubscribedPostURI, replyPost.CreatedAt) 44 + if err != nil { 45 + return fmt.Errorf("exec insert replies post: %w", err) 46 + } 47 + return nil 48 + } 49 + 50 + func (s *Store) GetUsersReplies(usersDID string, cursor int64, limit int) ([]ReplyPost, error) { 51 + sql := `SELECT id, replyURI, userDID, subscribedPostURI, createdAt FROM replies 52 + WHERE userDID = ? AND createdAt < ? 53 + ORDER BY createdAt DESC LIMIT ?;` 54 + rows, err := s.db.Query(sql, usersDID, cursor, limit) 55 + if err != nil { 56 + return nil, fmt.Errorf("run query to get users replied posts: %w", err) 57 + } 58 + defer rows.Close() 59 + 60 + repliedPosts := make([]ReplyPost, 0) 61 + for rows.Next() { 62 + var replyPost ReplyPost 63 + if err := rows.Scan(&replyPost.ID, &replyPost.ReplyURI, &replyPost.UserDID, &replyPost.SubscribedPostURI, &replyPost.CreatedAt); err != nil { 64 + return nil, fmt.Errorf("scan row: %w", err) 65 + } 66 + repliedPosts = append(repliedPosts, replyPost) 67 + } 68 + 69 + return repliedPosts, nil 70 + } 71 + 72 + func (s *Store) DeleteRepliedPostsForBookmarkedPostURIandUserDID(subscribedPostURI, userDID string) error { 73 + sql := "DELETE FROM replies WHERE subscribedPostURI = ? AND userDID = ?;" 74 + statement, err := s.db.Prepare(sql) 75 + if err != nil { 76 + return fmt.Errorf("prepare delete replies: %w", err) 77 + } 78 + res, err := statement.Exec(subscribedPostURI, userDID) 79 + if err != nil { 80 + return fmt.Errorf("exec delete replies: %w", err) 81 + } 82 + 83 + n, _ := res.RowsAffected() 84 + 85 + slog.Info("delete replies result", "affected rows", n) 86 + return nil 87 + }