this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

more refactoring

+72 -57
+22 -13
consumer.go
··· 14 14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 15 15 "github.com/bluesky-social/jetstream/pkg/models" 16 16 "github.com/bugsnag/bugsnag-go/v2" 17 - "github.com/willdot/bskyfeedgen/store" 18 17 ) 18 + 19 + type ConsumerStore interface { 20 + GetSubscriptionsForPost(postURI string) ([]string, error) 21 + AddSubscriptionForPost(subscribedPostURI, userDid, subscriptionPostRkey string) error 22 + GetSubscribedPostURI(userDID, subscriptionPostRkey string) (string, error) 23 + DeleteSubscriptionForUser(userDID, postURI string) error 24 + DeleteFeedPostsForSubscribedPostURIandUserDID(subscribedPostURI, userDID string) error 25 + } 19 26 20 27 type consumer struct { 21 - cfg *client.ClientConfig 28 + cfg *client.ClientConfig 29 + store ConsumerStore 22 30 } 23 31 24 - func NewConsumer(jsAddr string) *consumer { 32 + func NewConsumer(jsAddr string, store ConsumerStore) *consumer { 25 33 cfg := client.DefaultClientConfig() 26 34 if jsAddr != "" { 27 35 cfg.WebsocketURL = jsAddr ··· 31 39 } 32 40 cfg.WantedDids = []string{} 33 41 return &consumer{ 34 - cfg: cfg, 42 + cfg: cfg, 43 + store: store, 35 44 } 36 45 } 37 46 38 47 func (con *consumer) Consume(ctx context.Context, feedGen *FeedGenerator, logger *slog.Logger) error { 39 48 h := &handler{ 40 - seenSeqs: make(map[int64]struct{}), 41 49 feedGenerator: feedGen, 42 - store: *feedGen.store, 50 + store: con.store, 43 51 } 44 52 45 53 scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) 46 54 defer scheduler.Shutdown() 47 55 48 - // TODO: logger 49 - c, err := client.NewClient(con.cfg, slog.Default(), scheduler) 56 + c, err := client.NewClient(con.cfg, logger, scheduler) 50 57 if err != nil { 51 58 return fmt.Errorf("failed to create client: %w", err) 52 59 } ··· 62 69 } 63 70 64 71 type handler struct { 65 - seenSeqs map[int64]struct{} 66 - highwater int64 67 72 feedGenerator *FeedGenerator 68 - store store.Store 73 + store ConsumerStore 69 74 } 70 75 71 76 func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { ··· 102 107 subscribedPostURI := post.Reply.Parent.Uri 103 108 104 109 // look for posts that are "subscribe" so that we can add the post URI to a list of posts we want to find replies for 105 - if strings.Contains(post.Text, "/subscribe") && event.Did == "did:plc:dadhhalkfcq3gucaq25hjqon" { 110 + if strings.Contains(post.Text, "/subscribe") { 111 + // For now just look for me 112 + if event.Did != "did:plc:dadhhalkfcq3gucaq25hjqon" { 113 + return nil 114 + } 106 115 slog.Info("a post that's subscribing to another post. Adding to posts to look for", "subscribed post URI", subscribedPostURI) 107 116 return h.addDidToSubscribedPost(subscribedPostURI, event.Did, event.Commit.RKey) 108 117 } ··· 138 147 139 148 // delete from feeds for the subscribedPostURI and the users DID first. This is so that if this fails, it can be tried again and the 140 149 // subscription will be still there 141 - err = h.store.DeleteFeedItemsForSubscribedPostURIandUserDID(subscribedPostURI, event.Did) 150 + err = h.store.DeleteFeedPostsForSubscribedPostURIandUserDID(subscribedPostURI, event.Did) 142 151 if err != nil { 143 152 slog.Error("delete feed items for subscribedPostURI and user", "error", err, "subscribedPostURI", subscribedPostURI, "user DID", event.Did) 144 153 return fmt.Errorf("delete feed items for subscribedPostURI and user: %w", err)
+10 -5
feed.go feedgenerator.go
··· 9 9 "github.com/willdot/bskyfeedgen/store" 10 10 ) 11 11 12 + type feedStore interface { 13 + AddFeedPost(feedItem store.FeedPost) error 14 + GetUsersFeed(usersDID string) ([]store.FeedPost, error) 15 + } 16 + 12 17 type FeedGenerator struct { 13 - store *store.Store 18 + store feedStore 14 19 } 15 20 16 - func NewFeedGenerator(store *store.Store) *FeedGenerator { 21 + func NewFeedGenerator(store feedStore) *FeedGenerator { 17 22 return &FeedGenerator{ 18 23 store: store, 19 24 } ··· 24 29 Feed: make([]FeedItem, 0, 0), 25 30 } 26 31 27 - usersFeed, err := f.store.GetUsersFeedItems(userDID) 32 + usersFeed, err := f.store.GetUsersFeed(userDID) 28 33 if err != nil { 29 34 return resp, fmt.Errorf("get users feed items from DB: %w", err) 30 35 } ··· 44 49 45 50 func (f *FeedGenerator) AddToFeedPosts(usersDids []string, subscribedPostURI, replyPostURI string) { 46 51 for _, did := range usersDids { 47 - feedItem := store.FeedItem{ 52 + feedItem := store.FeedPost{ 48 53 ReplyURI: replyPostURI, 49 54 UserDID: did, 50 55 SubscribedPostURI: subscribedPostURI, 51 56 } 52 - err := f.store.AddFeedItem(feedItem) 57 + err := f.store.AddFeedPost(feedItem) 53 58 if err != nil { 54 59 slog.Error("add users feed item", "error", err, "did", did, "reply post URI", replyPostURI) 55 60 bugsnag.Notify(err)
+25 -24
main.go
··· 3 3 import ( 4 4 "context" 5 5 "errors" 6 - "fmt" 7 6 "log/slog" 8 7 "os" 9 8 "os/signal" ··· 25 24 26 25 signals := make(chan os.Signal, 1) 27 26 signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) 28 - ctx, cancel := context.WithCancel(context.Background()) 29 - defer cancel() 30 27 28 + enableJS := os.Getenv("ENABLE_JETSTREAM") 31 29 bugsnagAPIKey := os.Getenv("BUGSNAG_API_KEY") 30 + 31 + feedDidBase := os.Getenv("FEED_DID_BASE") 32 + if feedDidBase == "" { 33 + slog.Error("FEED_DID_BASE not set") 34 + os.Exit(1) 35 + } 36 + feedHost := os.Getenv("FEED_HOST_NAME") 37 + if feedHost == "" { 38 + slog.Error("FEED_HOST_NAME not set") 39 + os.Exit(1) 40 + } 41 + dbMountPath := os.Getenv("RAILWAY_VOLUME_MOUNT_PATH") 42 + if dbMountPath == "" { 43 + slog.Error("RAILWAY_VOLUME_MOUNT_PATH env not set") 44 + os.Exit(1) 45 + } 46 + 32 47 if bugsnagAPIKey != "" { 33 48 bugsnag.Configure(bugsnag.Configuration{ 34 49 APIKey: bugsnagAPIKey, ··· 36 51 // The import paths for the Go packages containing your source files 37 52 ProjectPackages: []string{"main", "github.com/willdot/bskyfeedgen"}, 38 53 // more configuration options 54 + AutoCaptureSessions: false, 39 55 }) 40 56 } 41 57 42 - dbMountPath := os.Getenv("RAILWAY_VOLUME_MOUNT_PATH") 43 - if dbMountPath == "" { 44 - bugsnag.Notify(fmt.Errorf("RAILWAY_VOLUME_MOUNT_PATH env not set")) 45 - return 46 - } 47 58 dbFilename := path.Join(dbMountPath, "database.db") 48 - 49 59 store, err := store.New(dbFilename) 50 60 if err != nil { 51 61 slog.Error("create new store", "error", err) ··· 56 66 57 67 feeder := NewFeedGenerator(store) 58 68 59 - feedDidBase := os.Getenv("FEED_DID_BASE") 60 - if feedDidBase == "" { 61 - slog.Error("FEED_DID_BASE not set") 62 - os.Exit(1) 63 - } 64 - feedHost := os.Getenv("FEED_HOST_NAME") 65 - if feedHost == "" { 66 - slog.Error("FEED_HOST_NAME not set") 67 - os.Exit(1) 68 - } 69 + ctx, cancel := context.WithCancel(context.Background()) 70 + defer cancel() 69 71 70 - enableJS := os.Getenv("ENABLE_JETSTREAM") 71 72 if enableJS == "true" { 72 73 slog.Info("enabling jetstream consume") 73 - go consumeLoop(ctx, jsServerAddr, feeder) 74 + go consumeLoop(ctx, jsServerAddr, feeder, store) 74 75 } 75 76 76 77 server := NewServer(443, feeder, feedHost, feedDidBase) ··· 86 87 time.Sleep(time.Second) 87 88 } 88 89 89 - func consumeLoop(ctx context.Context, jsServerAddr string, feeder *FeedGenerator) { 90 - consumer := NewConsumer(jsServerAddr) 90 + func consumeLoop(ctx context.Context, jsServerAddr string, feeder *FeedGenerator, store *store.Store) { 91 + consumer := NewConsumer(jsServerAddr, store) 91 92 92 93 retry.Do(func() error { 93 94 err := consumer.Consume(ctx, feeder, slog.Default()) ··· 99 100 return err 100 101 } 101 102 return nil 102 - }, retry.Attempts(0)) 103 + }, retry.Attempts(0)) // retry indefinitly until context canceled 103 104 }
+1 -1
server.go
··· 70 70 71 71 feed := params.Get("feed") 72 72 if feed == "" { 73 - slog.Error("missing query param", "host", r.RemoteAddr) 73 + slog.Error("missing feed query param", "host", r.RemoteAddr) 74 74 http.Error(w, "missing feed query param", http.StatusBadRequest) 75 75 return 76 76 }
+14 -14
store/feed.go
··· 29 29 return nil 30 30 } 31 31 32 - type FeedItem struct { 32 + type FeedPost struct { 33 33 ID int 34 34 ReplyURI string 35 35 UserDID string 36 36 SubscribedPostURI string 37 37 } 38 38 39 - func (s *Store) AddFeedItem(feedItem FeedItem) error { 39 + func (s *Store) AddFeedPost(feedPost FeedPost) error { 40 40 sql := `INSERT INTO feed (replyURI, userDID, subscribedPostURI) VALUES (?, ?, ?) ON CONFLICT(replyURI, userDID) DO NOTHING;` 41 - _, err := s.db.Exec(sql, feedItem.ReplyURI, feedItem.UserDID, feedItem.SubscribedPostURI) 41 + _, err := s.db.Exec(sql, feedPost.ReplyURI, feedPost.UserDID, feedPost.SubscribedPostURI) 42 42 if err != nil { 43 43 return fmt.Errorf("exec insert feed item: %w", err) 44 44 } 45 45 return nil 46 46 } 47 47 48 - func (s *Store) GetUsersFeedItems(usersDID string) ([]FeedItem, error) { 48 + func (s *Store) GetUsersFeed(usersDID string) ([]FeedPost, error) { 49 49 sql := "SELECT id, replyURI, userDID FROM feed WHERE userDID = ?;" 50 50 rows, err := s.db.Query(sql, usersDID) 51 51 if err != nil { 52 - return nil, fmt.Errorf("run query to get users feed item: %w", err) 52 + return nil, fmt.Errorf("run query to get users feed posts: %w", err) 53 53 } 54 54 defer rows.Close() 55 55 56 - feedItems := make([]FeedItem, 0) 56 + feedPosts := make([]FeedPost, 0) 57 57 for rows.Next() { 58 - var feedItem FeedItem 59 - if err := rows.Scan(&feedItem.ID, &feedItem.ReplyURI, &feedItem.UserDID); err != nil { 58 + var feedPost FeedPost 59 + if err := rows.Scan(&feedPost.ID, &feedPost.ReplyURI, &feedPost.UserDID); err != nil { 60 60 return nil, fmt.Errorf("scan row: %w", err) 61 61 } 62 - feedItems = append(feedItems, feedItem) 62 + feedPosts = append(feedPosts, feedPost) 63 63 } 64 64 65 - return feedItems, nil 65 + return feedPosts, nil 66 66 } 67 67 68 - func (s *Store) DeleteFeedItemsForSubscribedPostURIandUserDID(subscribedPostURI, userDID string) error { 68 + func (s *Store) DeleteFeedPostsForSubscribedPostURIandUserDID(subscribedPostURI, userDID string) error { 69 69 sql := "DELETE FROM feed WHERE subscribedPostURI = ? AND userDID = ?;" 70 70 statement, err := s.db.Prepare(sql) 71 71 if err != nil { 72 - return fmt.Errorf("prepare delete feed items: %w", err) 72 + return fmt.Errorf("prepare delete feed posts: %w", err) 73 73 } 74 74 res, err := statement.Exec(subscribedPostURI, userDID) 75 75 if err != nil { 76 - return fmt.Errorf("exec delete feed items: %w", err) 76 + return fmt.Errorf("exec delete feed posts: %w", err) 77 77 } 78 78 79 79 n, _ := res.RowsAffected() 80 80 81 - slog.Info("delete feed res", "affected rows", n) 81 + slog.Info("delete feed posts result", "affected rows", n) 82 82 return nil 83 83 }