this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #3 from willdot/feat/add-timestamp-to-feed

Add createdAt timestamp to feed post items so that we can use the cursor and sort

authored by

Will Andrew and committed by
GitHub
778aeced e44c0709

+220 -11
+19 -3
feedgenerator.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "log/slog" 7 + "strconv" 6 8 7 9 "github.com/willdot/bskyfeedgen/store" 8 10 ) 9 11 10 12 type feedStore interface { 11 - GetUsersFeed(usersDID string) ([]store.FeedPost, error) 13 + GetUsersFeed(usersDID string, cursor int64, limit int) ([]store.FeedPost, error) 12 14 } 13 15 14 16 type FeedGenerator struct { ··· 26 28 Feed: make([]FeedItem, 0, 0), 27 29 } 28 30 29 - usersFeed, err := f.store.GetUsersFeed(userDID) 31 + cursorInt, err := strconv.Atoi(cursor) 32 + if err != nil && cursor != "" { 33 + slog.Error("convert cursor to int", "error", err, "cursor value", cursor) 34 + } 35 + if cursorInt == 0 { 36 + // if no cursor provided use a date waaaaay in the future to start the less than query 37 + cursorInt = 9999999999999 38 + } 39 + 40 + usersFeed, err := f.store.GetUsersFeed(userDID, int64(cursorInt), limit) 30 41 if err != nil { 31 42 return resp, fmt.Errorf("get users feed items from DB: %w", err) 32 43 } ··· 39 50 } 40 51 41 52 resp.Feed = feedItems 42 - resp.Cursor = "" 43 53 54 + // only set the return cursor if there was a record returned and that the len of records 55 + // being returned is the same as the limit 56 + if len(usersFeed) > 0 && len(usersFeed) == limit { 57 + lastFeedItem := usersFeed[len(usersFeed)-1] 58 + resp.Cursor = fmt.Sprintf("%d", lastFeedItem.CreatedAt) 59 + } 44 60 return resp, nil 45 61 }
+10 -2
handler.go
··· 6 6 "fmt" 7 7 "log/slog" 8 8 "strings" 9 + "time" 9 10 10 11 apibsky "github.com/bluesky-social/indigo/api/bsky" 11 12 "github.com/bluesky-social/jetstream/pkg/models" ··· 81 82 82 83 slog.Info("post is a reply to a post that users are subscribed to", "subscribed post URI", subscribedPostURI, "dids", subscribedDids, "RKey", event.Commit.RKey) 83 84 85 + createdAt, err := time.Parse(time.RFC3339, post.CreatedAt) 86 + if err != nil { 87 + slog.Error("parsing createdAt time from post", "error", err, "timestamp", post.CreatedAt) 88 + createdAt = time.Now().UTC() 89 + } 90 + 84 91 replyPostURI := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey) 85 - h.createFeedPostForSubscribedUsers(subscribedDids, replyPostURI, subscribedPostURI) 92 + h.createFeedPostForSubscribedUsers(subscribedDids, replyPostURI, subscribedPostURI, createdAt.UnixMilli()) 86 93 return nil 87 94 } 88 95 ··· 138 145 return dids 139 146 } 140 147 141 - func (h *handler) createFeedPostForSubscribedUsers(usersDids []string, replyPostURI, subscribedPostURI string) { 148 + func (h *handler) createFeedPostForSubscribedUsers(usersDids []string, replyPostURI, subscribedPostURI string, createdAt int64) { 142 149 for _, did := range usersDids { 143 150 feedItem := store.FeedPost{ 144 151 ReplyURI: replyPostURI, 145 152 UserDID: did, 146 153 SubscribedPostURI: subscribedPostURI, 154 + CreatedAt: createdAt, 147 155 } 148 156 err := h.store.AddFeedPost(feedItem) 149 157 if err != nil {
+179
handler_test.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "testing" 8 + "time" 9 + 10 + "github.com/bluesky-social/indigo/api/atproto" 11 + apibsky "github.com/bluesky-social/indigo/api/bsky" 12 + "github.com/bluesky-social/jetstream/pkg/models" 13 + "github.com/stretchr/testify/assert" 14 + "github.com/stretchr/testify/require" 15 + "github.com/willdot/bskyfeedgen/store" 16 + ) 17 + 18 + func TestIt(t *testing.T) { 19 + ti := time.Now().Add(-time.Minute * 60) 20 + 21 + fmt.Println(ti.UnixMicro()) 22 + t.FailNow() 23 + } 24 + 25 + func TestHandlerReceivesSubscribeMessage(t *testing.T) { 26 + db, err := store.New(":memory:") 27 + require.NoError(t, err) 28 + 29 + handler := handler{ 30 + store: db, 31 + } 32 + 33 + record := apibsky.FeedPost{ 34 + Text: "/subscribe", 35 + Reply: &apibsky.FeedPost_ReplyRef{ 36 + Parent: &atproto.RepoStrongRef{ 37 + Uri: "parent-uri", 38 + }, 39 + }, 40 + } 41 + 42 + recordB, err := json.Marshal(record) 43 + require.NoError(t, err) 44 + 45 + event := &models.Event{ 46 + Did: myDid, 47 + Commit: &models.Commit{ 48 + Operation: models.CommitOperationCreate, 49 + Collection: "app.bsky.feed.post", 50 + RKey: "subscribe-post-rkey", 51 + Record: recordB, 52 + }, 53 + } 54 + 55 + // send the event twice to simulate subscribing to the same post twice, to check only 56 + // 1 subscription is created 57 + err = handler.HandleEvent(context.Background(), event) 58 + require.NoError(t, err) 59 + err = handler.HandleEvent(context.Background(), event) 60 + require.NoError(t, err) 61 + 62 + subs, err := db.GetSubscriptionsForPost("parent-uri") 63 + require.NoError(t, err) 64 + 65 + assert.Len(t, subs, 1) 66 + assert.Equal(t, myDid, subs[0]) 67 + } 68 + 69 + func TestHandlerReceivesReplyToASubscribedPost(t *testing.T) { 70 + db, err := store.New(":memory:") 71 + require.NoError(t, err) 72 + 73 + handler := handler{ 74 + store: db, 75 + } 76 + 77 + // add the subscription 78 + err = db.AddSubscriptionForPost("parent-uri", myDid, "subscribe-post-rkey") 79 + require.NoError(t, err) 80 + 81 + record := apibsky.FeedPost{ 82 + Text: "this is a reply to a post that was subscribed to", 83 + Reply: &apibsky.FeedPost_ReplyRef{ 84 + Parent: &atproto.RepoStrongRef{ 85 + Uri: "parent-uri", 86 + }, 87 + }, 88 + } 89 + 90 + recordB, err := json.Marshal(record) 91 + require.NoError(t, err) 92 + 93 + event := &models.Event{ 94 + Did: "some-random-did", 95 + Commit: &models.Commit{ 96 + Operation: models.CommitOperationCreate, 97 + Collection: "app.bsky.feed.post", 98 + RKey: "reply-post-rkey", 99 + Record: recordB, 100 + }, 101 + } 102 + 103 + err = handler.HandleEvent(context.Background(), event) 104 + require.NoError(t, err) 105 + 106 + feed, err := db.GetUsersFeed(myDid, 9999999999999, 5) 107 + require.NoError(t, err) 108 + 109 + assert.Len(t, feed, 1) 110 + expectedFeedPost := store.FeedPost{ 111 + ID: 1, 112 + ReplyURI: "at://some-random-did/app.bsky.feed.post/reply-post-rkey", 113 + UserDID: myDid, 114 + SubscribedPostURI: "parent-uri", 115 + } 116 + 117 + res := feed[0] 118 + // timestamps are hard to assert so check it's within a few seconds and then remove from 119 + // the result so the rest of the assertion can complete 120 + assert.WithinDuration(t, time.Now(), time.UnixMilli(res.CreatedAt), time.Second) 121 + res.CreatedAt = 0 122 + 123 + assert.Equal(t, expectedFeedPost, res) 124 + } 125 + 126 + func TestHandlerReceivesDeleteEvent(t *testing.T) { 127 + db, err := store.New(":memory:") 128 + require.NoError(t, err) 129 + 130 + handler := handler{ 131 + store: db, 132 + } 133 + 134 + // add the subscription 135 + err = db.AddSubscriptionForPost("parent-uri", myDid, "subscribe-post-rkey") 136 + require.NoError(t, err) 137 + // add in some feed posts 138 + feedPost1 := store.FeedPost{ 139 + ReplyURI: "at://some-random-did-1/app.bsky.feed.post/reply-post-rkey", 140 + UserDID: myDid, 141 + SubscribedPostURI: "parent-uri", 142 + } 143 + feedPost2 := store.FeedPost{ 144 + ReplyURI: "at://some-random-did-2/app.bsky.feed.post/reply-post-rkey", 145 + UserDID: myDid, 146 + SubscribedPostURI: "parent-uri", 147 + } 148 + err = db.AddFeedPost(feedPost1) 149 + require.NoError(t, err) 150 + err = db.AddFeedPost(feedPost2) 151 + require.NoError(t, err) 152 + // add a feed post for a different subscribed post 153 + feedPost3 := store.FeedPost{ 154 + ReplyURI: "at://some-random-did-3/app.bsky.feed.post/reply-post-rkey", 155 + UserDID: myDid, 156 + SubscribedPostURI: "different-parent-uri", 157 + } 158 + err = db.AddFeedPost(feedPost3) 159 + require.NoError(t, err) 160 + 161 + event := &models.Event{ 162 + Did: myDid, 163 + Commit: &models.Commit{ 164 + Operation: models.CommitOperationDelete, 165 + Collection: "app.bsky.feed.post", 166 + RKey: "subscribe-post-rkey", 167 + }, 168 + } 169 + 170 + err = handler.HandleEvent(context.Background(), event) 171 + require.NoError(t, err) 172 + 173 + feed, err := db.GetUsersFeed(myDid, 9999999999999, 5) 174 + require.NoError(t, err) 175 + 176 + assert.Len(t, feed, 1) 177 + feedPost3.ID = 3 178 + assert.Equal(t, feedPost3, feed[0]) 179 + }
+2
main.go
··· 104 104 } 105 105 return nil 106 106 }, retry.Attempts(0)) // retry indefinitly until context canceled 107 + 108 + slog.Warn("exiting consume loop") 107 109 }
+10 -6
store/feed.go
··· 12 12 "replyURI" TEXT, 13 13 "userDID" TEXT, 14 14 "subscribedPostURI" TEXT, 15 + "createdAt" integer NOT NULL, 15 16 UNIQUE(replyURI, userDID) 16 17 );` 17 18 ··· 34 35 ReplyURI string 35 36 UserDID string 36 37 SubscribedPostURI string 38 + CreatedAt int64 37 39 } 38 40 39 41 func (s *Store) AddFeedPost(feedPost FeedPost) error { 40 - sql := `INSERT INTO feed (replyURI, userDID, subscribedPostURI) VALUES (?, ?, ?) ON CONFLICT(replyURI, userDID) DO NOTHING;` 41 - _, err := s.db.Exec(sql, feedPost.ReplyURI, feedPost.UserDID, feedPost.SubscribedPostURI) 42 + sql := `INSERT INTO feed (replyURI, userDID, subscribedPostURI, createdAt) VALUES (?, ?, ?, ?) ON CONFLICT(replyURI, userDID) DO NOTHING;` 43 + _, err := s.db.Exec(sql, feedPost.ReplyURI, feedPost.UserDID, feedPost.SubscribedPostURI, feedPost.CreatedAt) 42 44 if err != nil { 43 45 return fmt.Errorf("exec insert feed item: %w", err) 44 46 } 45 47 return nil 46 48 } 47 49 48 - func (s *Store) GetUsersFeed(usersDID string) ([]FeedPost, error) { 49 - sql := "SELECT id, replyURI, userDID, subscribedPostURI FROM feed WHERE userDID = ?;" 50 - rows, err := s.db.Query(sql, usersDID) 50 + func (s *Store) GetUsersFeed(usersDID string, cursor int64, limit int) ([]FeedPost, error) { 51 + sql := `SELECT id, replyURI, userDID, subscribedPostURI, createdAt FROM feed 52 + WHERE userDID = ? AND createdAt < ? 53 + ORDER BY createdAt DESC LIMIT ?;` 54 + rows, err := s.db.Query(sql, usersDID, cursor, limit) 51 55 if err != nil { 52 56 return nil, fmt.Errorf("run query to get users feed posts: %w", err) 53 57 } ··· 56 60 feedPosts := make([]FeedPost, 0) 57 61 for rows.Next() { 58 62 var feedPost FeedPost 59 - if err := rows.Scan(&feedPost.ID, &feedPost.ReplyURI, &feedPost.UserDID, &feedPost.SubscribedPostURI); err != nil { 63 + if err := rows.Scan(&feedPost.ID, &feedPost.ReplyURI, &feedPost.UserDID, &feedPost.SubscribedPostURI, &feedPost.CreatedAt); err != nil { 60 64 return nil, fmt.Errorf("scan row: %w", err) 61 65 } 62 66 feedPosts = append(feedPosts, feedPost)