this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor to get users DID out of the getfeedskeleton so that we can store feeds per user

+70 -10
+1 -1
consumer.go
··· 100 100 // see if the post is a reply to a post we are subscribed to 101 101 if _, ok := h.parentsToLookFor[post.Reply.Parent.Uri]; ok { 102 102 slog.Info("post is a reply to a parent we are subscribed to", "parent URI", post.Reply.Parent.Uri, "did", event.Did, "RKey", event.Commit.RKey) 103 - h.feedGenerator.AddToFeedPosts(fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey)) 103 + h.feedGenerator.AddToFeedPosts(event.Did, fmt.Sprintf("at://%s/app.bsky.feed.post/%s", event.Did, event.Commit.RKey)) 104 104 } 105 105 } 106 106 }
+17 -6
feed.go
··· 7 7 8 8 type FeedGenerator struct { 9 9 mu sync.Mutex 10 - posts map[string]struct{} 10 + posts map[string][]string 11 11 } 12 12 13 13 func NewFeedGenerator() *FeedGenerator { 14 14 return &FeedGenerator{ 15 - posts: make(map[string]struct{}), 15 + posts: make(map[string][]string), 16 16 } 17 17 } 18 18 19 - func (f *FeedGenerator) GetFeed(ctx context.Context, feed, cursor string, limit int) (*FeedReponse, error) { 19 + func (f *FeedGenerator) GetFeed(ctx context.Context, userDID, feed, cursor string, limit int) (*FeedReponse, error) { 20 20 f.mu.Lock() 21 21 defer f.mu.Unlock() 22 22 23 + usersFeed, ok := f.posts[userDID] 24 + if !ok { 25 + return nil, nil 26 + } 27 + 23 28 feedItems := make([]FeedItem, 0, len(f.posts)) 24 - for post := range f.posts { 29 + for _, post := range usersFeed { 25 30 feedItems = append(feedItems, FeedItem{ 26 31 Post: post, 27 32 }) ··· 35 40 return resp, nil 36 41 } 37 42 38 - func (f *FeedGenerator) AddToFeedPosts(postURI string) { 43 + func (f *FeedGenerator) AddToFeedPosts(usersDid, postURI string) { 39 44 f.mu.Lock() 40 45 defer f.mu.Unlock() 41 46 // TODO: store this in DB instead 42 - f.posts[postURI] = struct{}{} 47 + usersPosts, ok := f.posts[usersDid] 48 + if !ok { 49 + usersPosts = make([]string, 0, 1) 50 + } 51 + 52 + usersPosts = append(usersPosts, postURI) 53 + f.posts[usersDid] = usersPosts 43 54 }
+2 -1
go.mod
··· 5 5 require ( 6 6 github.com/bluesky-social/indigo v0.0.0-20241031232035-1a73c3fb6841 7 7 github.com/bluesky-social/jetstream v0.0.0-20241031234625-0ab10bd041fe 8 + github.com/golang-jwt/jwt/v5 v5.2.1 9 + github.com/gorilla/websocket v1.5.1 8 10 ) 9 11 10 12 require ( ··· 17 19 github.com/goccy/go-json v0.10.2 // indirect 18 20 github.com/gogo/protobuf v1.3.2 // indirect 19 21 github.com/google/uuid v1.6.0 // indirect 20 - github.com/gorilla/websocket v1.5.1 // indirect 21 22 github.com/hashicorp/go-cleanhttp v0.5.2 // indirect 22 23 github.com/hashicorp/go-retryablehttp v0.7.5 // indirect 23 24 github.com/hashicorp/golang-lru v1.0.2 // indirect
+2
go.sum
··· 26 26 github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= 27 27 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= 28 28 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= 29 + github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= 30 + github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= 29 31 github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= 30 32 github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= 31 33 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+48 -2
server.go
··· 7 7 "log/slog" 8 8 "net/http" 9 9 "strconv" 10 + "strings" 11 + 12 + "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + "github.com/golang-jwt/jwt/v5" 10 15 ) 11 16 12 17 type Feeder interface { 13 - GetFeed(ctx context.Context, feed, cursor string, limit int) (*FeedReponse, error) 18 + GetFeed(ctx context.Context, userDID, feed, cursor string, limit int) (*FeedReponse, error) 14 19 } 15 20 16 21 type Server struct { ··· 92 97 } 93 98 94 99 cursor := params.Get("cursor") 100 + usersDID, err := validateAuth(r) 101 + if err != nil { 102 + slog.Error("validate auth", "error", err) 103 + http.Error(w, "validate auth", http.StatusUnauthorized) 104 + return 105 + } 95 106 96 - resp, err := s.feeder.GetFeed(r.Context(), feed, cursor, limit) 107 + resp, err := s.feeder.GetFeed(r.Context(), usersDID, feed, cursor, limit) 97 108 if err != nil { 98 109 slog.Error("get feed", "error", err, "feed", feed) 99 110 http.Error(w, "error getting feed", http.StatusInternalServerError) ··· 175 186 176 187 w.Write(b) 177 188 } 189 + 190 + // this extracts the DID of the user that has made the request from the JWT of the auth header 191 + var directory = identity.DefaultDirectory() 192 + 193 + func validateAuth(r *http.Request) (string, error) { 194 + headerValues := r.Header["Authorization"] 195 + if len(headerValues) != 1 { 196 + return "", fmt.Errorf("missing authorization header") 197 + } 198 + token := strings.TrimSpace(strings.Replace(headerValues[0], "Bearer ", "", 1)) 199 + 200 + nsid := strings.Replace(r.URL.Path, "/xrpc/", "", 1) 201 + 202 + parsedToken, err := jwt.ParseWithClaims(token, jwt.MapClaims{}, func(token *jwt.Token) (interface{}, error) { 203 + did := syntax.DID(token.Claims.(jwt.MapClaims)["iss"].(string)) 204 + identity, err := directory.LookupDID(r.Context(), did) 205 + if err != nil { 206 + return nil, fmt.Errorf("unable to resolve did %s: %s", did, err) 207 + } 208 + key, err := identity.PublicKey() 209 + if err != nil { 210 + return nil, fmt.Errorf("signing key not found for did %s: %s", did, err) 211 + } 212 + return key, nil 213 + }) 214 + if err != nil { 215 + return "", fmt.Errorf("invalid token: %s", err) 216 + } 217 + 218 + claims := parsedToken.Claims.(jwt.MapClaims) 219 + if claims["lxm"] != nsid { 220 + return "", fmt.Errorf("bad jwt lexicon method (\"lxm\"). must match: %s", nsid) 221 + } 222 + return claims["iss"].(string), nil 223 + }