this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement subscriptions

+97 -47
+20 -36
consumer.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "database/sql" 5 6 "encoding/json" 6 7 "fmt" 7 8 "log/slog" 8 9 "strings" 9 - "sync" 10 10 "time" 11 11 12 12 apibsky "github.com/bluesky-social/indigo/api/bsky" 13 13 "github.com/bluesky-social/jetstream/pkg/client" 14 14 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 15 15 "github.com/bluesky-social/jetstream/pkg/models" 16 + "github.com/bugsnag/bugsnag-go/v2" 16 17 ) 17 18 18 19 type consumer struct { ··· 35 36 36 37 func (con *consumer) Consume(ctx context.Context, feedGen *FeedGenerator, logger *slog.Logger) error { 37 38 h := &handler{ 38 - seenSeqs: make(map[int64]struct{}), 39 - feedGenerator: feedGen, 40 - parentsToLookFor: make(map[string]map[string]struct{}), 39 + seenSeqs: make(map[int64]struct{}), 40 + feedGenerator: feedGen, 41 + db: feedGen.db, 41 42 } 42 43 43 44 scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) ··· 60 61 } 61 62 62 63 type handler struct { 63 - seenSeqs map[int64]struct{} 64 - highwater int64 65 - feedGenerator *FeedGenerator 66 - mu sync.Mutex 67 - parentsToLookFor map[string]map[string]struct{} 64 + seenSeqs map[int64]struct{} 65 + highwater int64 66 + feedGenerator *FeedGenerator 67 + db *sql.DB 68 68 } 69 69 70 70 func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { ··· 90 90 // look for posts where I've "subsribed" so that we can add the parent URI to a list of replies to that parent to look for 91 91 if strings.Contains(post.Text, "/subscribe") && event.Did == "did:plc:dadhhalkfcq3gucaq25hjqon" { 92 92 slog.Info("a post that's subscribing to a parent. Adding to parents to look for", "parent URI", parentURI) 93 - h.addDidToSubscribedParent(parentURI, event.Did) 94 - return nil 93 + return h.addDidToSubscribedParent(parentURI, event.Did) 95 94 } 96 95 97 96 // see if the post is a reply to a post we are subscribed to ··· 102 101 103 102 slog.Info("post is a reply to a parent that users are subscribed to", "parent URI", parentURI, "dids", subscribedDids, "RKey", event.Commit.RKey) 104 103 105 - h.feedGenerator.AddToFeedPosts(subscribedDids, fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey)) 104 + h.feedGenerator.AddToFeedPosts(subscribedDids, parentURI, fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey)) 106 105 } 107 106 } 108 107 return nil 109 108 } 110 109 111 - func (h *handler) addDidToSubscribedParent(parentURI, did string) { 112 - h.mu.Lock() 113 - defer h.mu.Unlock() 114 - 115 - subscribedDids, ok := h.parentsToLookFor[parentURI] 116 - if !ok { 117 - h.parentsToLookFor[parentURI] = map[string]struct{}{ 118 - did: {}, 119 - } 120 - return 110 + func (h *handler) addDidToSubscribedParent(parentURI, userDid string) error { 111 + err := addSubscriptionForParent(h.db, parentURI, userDid) 112 + if err != nil { 113 + return fmt.Errorf("add subscription for parent: %w", err) 121 114 } 122 - 123 - subscribedDids[did] = struct{}{} 124 - h.parentsToLookFor[parentURI] = subscribedDids 115 + return nil 125 116 } 126 117 127 118 func (h *handler) getSubscribedDidsForParent(parentURI string) []string { 128 - h.mu.Lock() 129 - defer h.mu.Unlock() 130 - 131 - subscribedDids, ok := h.parentsToLookFor[parentURI] 132 - if !ok { 133 - return nil 134 - } 135 - 136 - dids := make([]string, 0, len(subscribedDids)) 137 - for did := range subscribedDids { 138 - dids = append(dids, did) 119 + dids, err := getSubscriptionsForParent(h.db, parentURI) 120 + if err != nil { 121 + slog.Error("getting subscriptions for parent", "error", err) 122 + bugsnag.Notify(err) 139 123 } 140 124 141 125 return dids
+73 -8
database.go
··· 32 32 return nil, fmt.Errorf("creating feed table: %w", err) 33 33 } 34 34 35 + err = createSubscriptionTable(db) 36 + if err != nil { 37 + return nil, fmt.Errorf("creating subscription table: %w", err) 38 + } 39 + 35 40 return db, nil 36 41 } 37 42 ··· 52 57 createFeedTableSQL := `CREATE TABLE IF NOT EXISTS feed ( 53 58 "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, 54 59 "uri" TEXT, 55 - "userDID" TEXT 60 + "userDID" TEXT, 61 + "parentURI" TEXT, 62 + UNIQUE(uri, userDID) 56 63 );` 57 64 58 65 slog.Info("Create feed table...") 59 66 statement, err := db.Prepare(createFeedTableSQL) 60 67 if err != nil { 61 - return fmt.Errorf("prepare DB statement to create table: %w", err) 68 + return fmt.Errorf("prepare DB statement to create feeds table: %w", err) 62 69 } 63 70 _, err = statement.Exec() 64 71 if err != nil { 65 - return fmt.Errorf("exec sql statement to create table: %w", err) 72 + return fmt.Errorf("exec sql statement to create feeds table: %w", err) 66 73 } 67 74 slog.Info("feed table created") 68 75 69 76 return nil 70 77 } 71 78 79 + func createSubscriptionTable(db *sql.DB) error { 80 + createSubscriptionTableSQL := `CREATE TABLE IF NOT EXISTS subscriptions ( 81 + "id" integer NOT NULL PRIMARY KEY AUTOINCREMENT, 82 + "parentURI" TEXT, 83 + "userDID" TEXT, 84 + UNIQUE(parentURI, userDID) 85 + );` 86 + 87 + slog.Info("Create subscriptions table...") 88 + statement, err := db.Prepare(createSubscriptionTableSQL) 89 + if err != nil { 90 + return fmt.Errorf("prepare DB statement to create subscriptions table: %w", err) 91 + } 92 + _, err = statement.Exec() 93 + if err != nil { 94 + return fmt.Errorf("exec sql statement to create subscriptions table: %w", err) 95 + } 96 + slog.Info("feed subscriptions created") 97 + 98 + return nil 99 + } 100 + 72 101 type feedItem struct { 73 - ID int 74 - URI string 75 - UserDID string 102 + ID int 103 + URI string 104 + UserDID string 105 + parentURI string 76 106 } 77 107 78 108 func addFeedItem(_ context.Context, db *sql.DB, feedItem feedItem) error { 79 - sql := `INSERT INTO feed (uri, userDID) VALUES (?, ?);` 80 - _, err := db.Exec(sql, feedItem.URI, feedItem.UserDID) 109 + sql := `INSERT INTO feed (uri, userDID, parentURI) VALUES (?, ?, ?);` 110 + _, err := db.Exec(sql, feedItem.URI, feedItem.UserDID, feedItem.parentURI) 81 111 if err != nil { 82 112 return fmt.Errorf("exec insert feed item: %w", err) 83 113 } ··· 103 133 104 134 return feedItems, nil 105 135 } 136 + 137 + type subscription struct { 138 + ID int 139 + ParentURI string 140 + UserDID string 141 + } 142 + 143 + func getSubscriptionsForParent(db *sql.DB, parentURI string) ([]string, error) { 144 + sql := "SELECT id, parentURI, userDID FROM subscription WHERE parentURI = ?" 145 + rows, err := db.Query(sql, parentURI) 146 + if err != nil { 147 + return nil, fmt.Errorf("run query to get subscriptions: %w", err) 148 + } 149 + defer rows.Close() 150 + 151 + dids := make([]string, 0) 152 + for rows.Next() { 153 + var subscription subscription 154 + if err := rows.Scan(&subscription.ID, &subscription.ParentURI, &subscription.UserDID); err != nil { 155 + return nil, fmt.Errorf("scan row: %w", err) 156 + } 157 + dids = append(dids, subscription.UserDID) 158 + } 159 + 160 + return dids, nil 161 + } 162 + 163 + func addSubscriptionForParent(db *sql.DB, parentURI, userDid string) error { 164 + sql := `INSERT INTO subscriptions (parentURI, userDID,) VALUES (?, ?);` 165 + _, err := db.Exec(sql, parentURI, userDid) 166 + if err != nil { 167 + return fmt.Errorf("exec insert subscrption: %w", err) 168 + } 169 + return nil 170 + }
+4 -3
feed.go
··· 42 42 return resp, nil 43 43 } 44 44 45 - func (f *FeedGenerator) AddToFeedPosts(usersDids []string, postURI string) { 45 + func (f *FeedGenerator) AddToFeedPosts(usersDids []string, parentURI, postURI string) { 46 46 for _, did := range usersDids { 47 47 feedItem := feedItem{ 48 - URI: postURI, 49 - UserDID: did, 48 + URI: postURI, 49 + UserDID: did, 50 + parentURI: parentURI, 50 51 } 51 52 err := addFeedItem(context.Background(), f.db, feedItem) 52 53 if err != nil {