this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

another refactor, this time to split handler and consumer out a bit more

+19 -163
+12 -140
consumer.go
··· 3 3 import ( 4 4 "context" 5 5 6 - "encoding/json" 7 6 "fmt" 8 7 "log/slog" 9 - "strings" 10 8 "time" 11 9 12 - apibsky "github.com/bluesky-social/indigo/api/bsky" 13 10 "github.com/bluesky-social/jetstream/pkg/client" 14 11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 15 - "github.com/bluesky-social/jetstream/pkg/models" 16 - "github.com/bugsnag/bugsnag-go/v2" 17 12 ) 18 13 19 - type ConsumerStore interface { 20 - GetSubscriptionsForPost(postURI string) ([]string, error) 21 - AddSubscriptionForPost(subscribedPostURI, userDid, subscriptionPostRkey string) error 22 - GetSubscribedPostURI(userDID, subscriptionPostRkey string) (string, error) 23 - DeleteSubscriptionForUser(userDID, postURI string) error 24 - DeleteFeedPostsForSubscribedPostURIandUserDID(subscribedPostURI, userDID string) error 25 - } 26 - 27 14 type consumer struct { 28 - cfg *client.ClientConfig 29 - store ConsumerStore 15 + cfg *client.ClientConfig 16 + handler *handler 17 + logger *slog.Logger 30 18 } 31 19 32 - func NewConsumer(jsAddr string, store ConsumerStore) *consumer { 20 + func NewConsumer(jsAddr string, logger *slog.Logger, handler *handler) *consumer { 33 21 cfg := client.DefaultClientConfig() 34 22 if jsAddr != "" { 35 23 cfg.WebsocketURL = jsAddr ··· 38 26 "app.bsky.feed.post", 39 27 } 40 28 cfg.WantedDids = []string{} 29 + 41 30 return &consumer{ 42 - cfg: cfg, 43 - store: store, 31 + cfg: cfg, 32 + logger: logger, 33 + handler: handler, 44 34 } 45 35 } 46 36 47 - func (con *consumer) Consume(ctx context.Context, feedGen *FeedGenerator, logger *slog.Logger) error { 48 - h := &handler{ 49 - feedGenerator: feedGen, 50 - store: con.store, 51 - } 52 - 53 - scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) 37 + func (c *consumer) Consume(ctx context.Context) error { 38 + scheduler := sequential.NewScheduler("jetstream_localdev", c.logger, c.handler.HandleEvent) 54 39 defer scheduler.Shutdown() 55 40 56 - c, err := client.NewClient(con.cfg, logger, scheduler) 41 + client, err := client.NewClient(c.cfg, c.logger, scheduler) 57 42 if err != nil { 58 43 return fmt.Errorf("failed to create client: %w", err) 59 44 } 60 45 61 46 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 62 47 63 - if err := c.ConnectAndRead(ctx, &cursor); err != nil { 48 + if err := client.ConnectAndRead(ctx, &cursor); err != nil { 64 49 return fmt.Errorf("connect and read: %w", err) 65 50 } 66 51 67 52 slog.Info("stopping consume") 68 53 return nil 69 54 } 70 - 71 - type handler struct { 72 - feedGenerator *FeedGenerator 73 - store ConsumerStore 74 - } 75 - 76 - func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { 77 - if event.Commit == nil { 78 - return nil 79 - } 80 - 81 - switch event.Commit.Operation { 82 - case models.CommitOperationCreate: 83 - return h.handleCreateEvent(ctx, event) 84 - case models.CommitOperationDelete: 85 - return h.handleDeleteEvent(ctx, event) 86 - default: 87 - return nil 88 - } 89 - } 90 - 91 - func (h *handler) handleCreateEvent(_ context.Context, event *models.Event) error { 92 - if event.Commit.Collection != "app.bsky.feed.post" { 93 - return nil 94 - } 95 - 96 - var post apibsky.FeedPost 97 - if err := json.Unmarshal(event.Commit.Record, &post); err != nil { 98 - // ignore this 99 - return nil 100 - } 101 - 102 - // we only care about posts that have parents which are replies 103 - if post.Reply == nil || post.Reply.Parent == nil || post.Reply.Parent.Uri == "" { 104 - return nil 105 - } 106 - 107 - subscribedPostURI := post.Reply.Parent.Uri 108 - 109 - // look for posts that are "subscribe" so that we can add the post URI to a list of posts we want to find replies for 110 - if strings.Contains(post.Text, "/subscribe") { 111 - // For now just look for me 112 - if event.Did != "did:plc:dadhhalkfcq3gucaq25hjqon" { 113 - return nil 114 - } 115 - slog.Info("a post that's subscribing to another post. Adding to posts to look for", "subscribed post URI", subscribedPostURI) 116 - return h.addDidToSubscribedPost(subscribedPostURI, event.Did, event.Commit.RKey) 117 - } 118 - 119 - // see if the post is a reply to a post we are subscribed to 120 - subscribedDids := h.getSubscribedDidsForPost(subscribedPostURI) 121 - if len(subscribedDids) == 0 { 122 - return nil 123 - } 124 - 125 - slog.Info("post is a reply to a post that users are subscribed to", "subscribed post URI", subscribedPostURI, "dids", subscribedDids, "RKey", event.Commit.RKey) 126 - 127 - replyPostURI := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey) 128 - h.feedGenerator.AddToFeedPosts(subscribedDids, subscribedPostURI, replyPostURI) 129 - return nil 130 - } 131 - 132 - func (h *handler) handleDeleteEvent(_ context.Context, event *models.Event) error { 133 - if event.Commit.Collection != "app.bsky.feed.post" { 134 - return nil 135 - } 136 - 137 - // temp ignore everyone but me 138 - if event.Did != "did:plc:dadhhalkfcq3gucaq25hjqon" { 139 - return nil 140 - } 141 - slog.Info("delete event received", "did", event.Did, "rkey", event.Commit.RKey) 142 - subscribedPostURI, err := h.store.GetSubscribedPostURI(event.Did, event.Commit.RKey) 143 - if err != nil { 144 - slog.Error("get subscribed post URI", "error", err, "rkey", event.Commit.RKey, "user DID", event.Did) 145 - return fmt.Errorf("get subscribed post URI: %w", err) 146 - } 147 - 148 - // delete from feeds for the subscribedPostURI and the users DID first. This is so that if this fails, it can be tried again and the 149 - // subscription will be still there 150 - err = h.store.DeleteFeedPostsForSubscribedPostURIandUserDID(subscribedPostURI, event.Did) 151 - if err != nil { 152 - slog.Error("delete feed items for subscribedPostURI and user", "error", err, "subscribedPostURI", subscribedPostURI, "user DID", event.Did) 153 - return fmt.Errorf("delete feed items for subscribedPostURI and user: %w", err) 154 - } 155 - 156 - // delete from subscriptions for the postURI and the users DID now that we have cleaned up the feeds 157 - err = h.store.DeleteSubscriptionForUser(event.Did, subscribedPostURI) 158 - if err != nil { 159 - slog.Error("delete subscription for user", "error", err, "subscribedPostURI", subscribedPostURI, "user DID", event.Did) 160 - return fmt.Errorf("delete subscription and user: %w", err) 161 - } 162 - 163 - return nil 164 - } 165 - 166 - func (h *handler) addDidToSubscribedPost(subscribedPostURI, userDid, subscriptionPostRkey string) error { 167 - err := h.store.AddSubscriptionForPost(subscribedPostURI, userDid, subscriptionPostRkey) 168 - if err != nil { 169 - return fmt.Errorf("add subscription for post: %w", err) 170 - } 171 - return nil 172 - } 173 - 174 - func (h *handler) getSubscribedDidsForPost(postURI string) []string { 175 - dids, err := h.store.GetSubscriptionsForPost(postURI) 176 - if err != nil { 177 - slog.Error("getting subscriptions for post", "error", err) 178 - bugsnag.Notify(err) 179 - } 180 - 181 - return dids 182 - }
-19
feedgenerator.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 - "log/slog" 7 6 8 - "github.com/bugsnag/bugsnag-go/v2" 9 7 "github.com/willdot/bskyfeedgen/store" 10 8 ) 11 9 12 10 type feedStore interface { 13 - AddFeedPost(feedItem store.FeedPost) error 14 11 GetUsersFeed(usersDID string) ([]store.FeedPost, error) 15 12 } 16 13 ··· 46 43 47 44 return resp, nil 48 45 } 49 - 50 - func (f *FeedGenerator) AddToFeedPosts(usersDids []string, subscribedPostURI, replyPostURI string) { 51 - for _, did := range usersDids { 52 - feedItem := store.FeedPost{ 53 - ReplyURI: replyPostURI, 54 - UserDID: did, 55 - SubscribedPostURI: subscribedPostURI, 56 - } 57 - err := f.store.AddFeedPost(feedItem) 58 - if err != nil { 59 - slog.Error("add users feed item", "error", err, "did", did, "reply post URI", replyPostURI) 60 - bugsnag.Notify(err) 61 - continue 62 - } 63 - } 64 - }
+7 -4
main.go
··· 71 71 72 72 if enableJS == "true" { 73 73 slog.Info("enabling jetstream consume") 74 - go consumeLoop(ctx, jsServerAddr, feeder, store) 74 + go consumeLoop(ctx, jsServerAddr, store) 75 75 } 76 76 77 77 server := NewServer(443, feeder, feedHost, feedDidBase) ··· 87 87 time.Sleep(time.Second) 88 88 } 89 89 90 - func consumeLoop(ctx context.Context, jsServerAddr string, feeder *FeedGenerator, store *store.Store) { 91 - consumer := NewConsumer(jsServerAddr, store) 90 + func consumeLoop(ctx context.Context, jsServerAddr string, store *store.Store) { 91 + handler := handler{ 92 + store: store, 93 + } 94 + consumer := NewConsumer(jsServerAddr, slog.Default(), &handler) 92 95 93 96 retry.Do(func() error { 94 - err := consumer.Consume(ctx, feeder, slog.Default()) 97 + err := consumer.Consume(ctx) 95 98 if err != nil { 96 99 if errors.Is(err, context.Canceled) { 97 100 return nil