···11-# # Use the Go 1.23 alpine official image
22-# # https://hub.docker.com/_/golang
33-# FROM golang:1.23-alpine
44-55-# # Create and change to the app directory.
66-# WORKDIR /app
77-88-# # Copy go mod and sum files
99-# COPY go.mod go.sum ./
1010-1111-# # Copy local code to the container image.
1212-# COPY . ./
1313-1414-# # Install project dependencies
1515-# RUN CGO_ENABLED=1 go mod download
1616-1717-# # Build the app
1818-# RUN go build -o app
1919-2020-# # Run the service on container startup.
2121-# ENTRYPOINT ["./app"]
+28-29
consumer.go
···2233import (
44 "context"
55- "database/sql"
55+66 "encoding/json"
77 "fmt"
88 "log/slog"
···1414 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
1515 "github.com/bluesky-social/jetstream/pkg/models"
1616 "github.com/bugsnag/bugsnag-go/v2"
1717+ "github.com/willdot/bskyfeedgen/store"
1718)
18191920type consumer struct {
···3839 h := &handler{
3940 seenSeqs: make(map[int64]struct{}),
4041 feedGenerator: feedGen,
4141- db: feedGen.db,
4242+ store: *feedGen.store,
4243 }
43444445 scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent)
···6465 seenSeqs map[int64]struct{}
6566 highwater int64
6667 feedGenerator *FeedGenerator
6767- db *sql.DB
6868+ store store.Store
6869}
69707071func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
···9899 return nil
99100 }
100101101101- parentURI := post.Reply.Parent.Uri
102102+ subscribedPostURI := post.Reply.Parent.Uri
102103103103- // look for posts where I've "subsribed" so that we can add the parent URI to a list of replies to that parent to look for
104104+ // look for posts that are "subscribe" so that we can add the post URI to a list of posts we want to find replies for
104105 if strings.Contains(post.Text, "/subscribe") && event.Did == "did:plc:dadhhalkfcq3gucaq25hjqon" {
105105- slog.Info("a post that's subscribing to a parent. Adding to parents to look for", "parent URI", parentURI)
106106- return h.addDidToSubscribedParent(parentURI, event.Did, event.Commit.RKey)
106106+ slog.Info("a post that's subscribing to another post. Adding to posts to look for", "subscribed post URI", subscribedPostURI)
107107+ return h.addDidToSubscribedPost(subscribedPostURI, event.Did, event.Commit.RKey)
107108 }
108109109110 // see if the post is a reply to a post we are subscribed to
110110- subscribedDids := h.getSubscribedDidsForParent(parentURI)
111111+ subscribedDids := h.getSubscribedDidsForPost(subscribedPostURI)
111112 if len(subscribedDids) == 0 {
112113 return nil
113114 }
114115115115- slog.Info("post is a reply to a parent that users are subscribed to", "parent URI", parentURI, "dids", subscribedDids, "RKey", event.Commit.RKey)
116116+ slog.Info("post is a reply to a post that users are subscribed to", "subscribed post URI", subscribedPostURI, "dids", subscribedDids, "RKey", event.Commit.RKey)
116117117117- h.feedGenerator.AddToFeedPosts(subscribedDids, parentURI, fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey))
118118+ replyPostURI := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey)
119119+ h.feedGenerator.AddToFeedPosts(subscribedDids, subscribedPostURI, replyPostURI)
118120 return nil
119121}
120122···128130 return nil
129131 }
130132 slog.Info("delete event received", "did", event.Did, "rkey", event.Commit.RKey)
131131-132132- parentURI, err := getSubscribingPostParentURI(h.db, event.Did, event.Commit.RKey)
133133+ subscribedPostURI, err := h.store.GetSubscribedPostURI(event.Did, event.Commit.RKey)
133134 if err != nil {
134134- slog.Error("get subscribing post parent URI", "error", err, "rkey", event.Commit.RKey, "user DID", event.Did)
135135- return fmt.Errorf("get subscribing post parent URI: %w", err)
135135+ slog.Error("get subscribed post URI", "error", err, "rkey", event.Commit.RKey, "user DID", event.Did)
136136+ return fmt.Errorf("get subscribed post URI: %w", err)
136137 }
137138138138- slog.Info("delete parent URI", "parent URI", parentURI, "rkey", event.Commit.RKey)
139139-140140- // delete from feeds for the parentURI and the users DID first. This is so that if this fails, it can be tried again and the
139139+ // delete from feeds for the subscribedPostURI and the users DID first. This is so that if this fails, it can be tried again and the
141140 // subscription will be still there
142142- err = deleteFeedItemsForParentURIandUserDID(h.db, parentURI, event.Did)
141141+ err = h.store.DeleteFeedItemsForSubscribedPostURIandUserDID(subscribedPostURI, event.Did)
143142 if err != nil {
144144- slog.Error("delete feed items for parentURI and user", "error", err, "parentURI", parentURI, "user DID", event.Did)
145145- return fmt.Errorf("delete feed items for parentURI and user: %w", err)
143143+ slog.Error("delete feed items for subscribedPostURI and user", "error", err, "subscribedPostURI", subscribedPostURI, "user DID", event.Did)
144144+ return fmt.Errorf("delete feed items for subscribedPostURI and user: %w", err)
146145 }
147146148148- // delete from subscriptions for the parentURI and the users DID now that we have cleaned up the feeds
149149- err = deleteSubscriptionForUser(h.db, event.Did, parentURI)
147147+ // delete from subscriptions for the postURI and the users DID now that we have cleaned up the feeds
148148+ err = h.store.DeleteSubscriptionForUser(event.Did, subscribedPostURI)
150149 if err != nil {
151151- slog.Error("delete subscription for user", "error", err, "parentURI", parentURI, "user DID", event.Did)
150150+ slog.Error("delete subscription for user", "error", err, "subscribedPostURI", subscribedPostURI, "user DID", event.Did)
152151 return fmt.Errorf("delete subscription and user: %w", err)
153152 }
154153155154 return nil
156155}
157156158158-func (h *handler) addDidToSubscribedParent(parentURI, userDid, rkey string) error {
159159- err := addSubscriptionForParent(h.db, parentURI, userDid, rkey)
157157+func (h *handler) addDidToSubscribedPost(subscribedPostURI, userDid, rkey string) error {
158158+ err := h.store.AddSubscriptionForPost(subscribedPostURI, userDid, rkey)
160159 if err != nil {
161161- return fmt.Errorf("add subscription for parent: %w", err)
160160+ return fmt.Errorf("add subscription for post: %w", err)
162161 }
163162 return nil
164163}
165164166166-func (h *handler) getSubscribedDidsForParent(parentURI string) []string {
167167- dids, err := getSubscriptionsForParent(h.db, parentURI)
165165+func (h *handler) getSubscribedDidsForPost(postURI string) []string {
166166+ dids, err := h.store.GetSubscriptionsForPost(postURI)
168167 if err != nil {
169169- slog.Error("getting subscriptions for parent", "error", err)
168168+ slog.Error("getting subscriptions for post", "error", err)
170169 bugsnag.Notify(err)
171170 }
172171