this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

consume from JS and if the post is /subscribe add the parent URI to the posts to return in the feed

+73 -39
+15 -22
consumer.go
··· 34 34 } 35 35 } 36 36 37 - func (con *consumer) Consume(ctx context.Context, logger *slog.Logger) error { 37 + func (con *consumer) Consume(ctx context.Context, feedGen *FeedGenerator, logger *slog.Logger) error { 38 38 h := &handler{ 39 - seenSeqs: make(map[int64]struct{}), 39 + seenSeqs: make(map[int64]struct{}), 40 + feedGenerator: feedGen, 40 41 } 41 42 42 43 scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) ··· 49 50 50 51 cursor := time.Now().Add(5 * -time.Minute).UnixMicro() 51 52 52 - // // Every 5 seconds print the events read and bytes read and average event size 53 - // go func() { 54 - // ticker := time.NewTicker(5 * time.Second) 55 - // for { 56 - // select { 57 - // case <-ticker.C: 58 - // eventsRead := c.EventsRead.Load() 59 - // if eventsRead == 0 { 60 - // continue 61 - // } 62 - // bytesRead := c.BytesRead.Load() 63 - // avgEventSize := bytesRead / eventsRead 64 - // logger.Info("stats", "events_read", eventsRead, "bytes_read", bytesRead, "avg_event_size", avgEventSize) 65 - // } 66 - // } 67 - // }() 68 - 69 53 if err := c.ConnectAndRead(ctx, &cursor); err != nil { 70 54 return fmt.Errorf("connect and read: %w", err) 71 55 } ··· 75 59 } 76 60 77 61 type handler struct { 78 - seenSeqs map[int64]struct{} 79 - highwater int64 62 + seenSeqs map[int64]struct{} 63 + highwater int64 64 + feedGenerator *FeedGenerator 80 65 } 81 66 82 67 func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { ··· 89 74 return fmt.Errorf("failed to unmarshal post: %w", err) 90 75 } 91 76 92 - fmt.Printf("%v |(%s)| %s\n", time.UnixMicro(event.TimeUS).Local().Format("15:04:05"), event.Did, post.Text) 77 + // only look for posts where I've "subsribed" 78 + if post.Text != "/subscribe" { 79 + return nil 80 + } 81 + 82 + if post.Reply != nil && post.Reply.Parent != nil && post.Reply.Parent.Uri != "" { 83 + slog.Info("it's a reply with a parent! Adding to feeds", "parent URI", post.Reply.Parent.Uri) 84 + h.feedGenerator.AddToFeed(post.Reply.Parent.Uri) 85 + } 93 86 } 94 87 } 95 88
+33 -8
feed.go
··· 1 1 package main 2 2 3 - import "context" 3 + import ( 4 + "context" 5 + "sync" 6 + ) 4 7 5 8 type FeedGenerator struct { 9 + mu sync.Mutex 6 10 feeds []string 7 11 } 8 12 ··· 12 16 } 13 17 14 18 func (f *FeedGenerator) GetFeed(ctx context.Context, feed, cursor string, limit int) (*FeedReponse, error) { 15 - // TODO: get something from a database 19 + // TODO: get something from a database instead 20 + // resp := &FeedReponse{ 21 + // Feed: []FeedItem{ 22 + // { 23 + // Post: "at://did:plc:dadhhalkfcq3gucaq25hjqon/app.bsky.feed.post/3l7j5ma2si42r", 24 + // FeedContext: "this is some additional context", 25 + // }, 26 + // }, 27 + // Cursor: "", 28 + // } 29 + f.mu.Lock() 30 + defer f.mu.Unlock() 31 + feedItems := make([]FeedItem, 0, len(f.feeds)) 32 + for _, feed := range f.feeds { 33 + feedItems = append(feedItems, FeedItem{ 34 + Post: feed, 35 + }) 36 + } 37 + 16 38 resp := &FeedReponse{ 17 - Feed: []FeedItme{ 18 - { 19 - Post: "at://did:plc:dadhhalkfcq3gucaq25hjqon/app.bsky.feed.post/3l7j5ma2si42r", 20 - FeedContext: "this is some additional context", 21 - }, 22 - }, 39 + Feed: feedItems, 23 40 Cursor: "", 24 41 } 42 + 25 43 return resp, nil 26 44 } 45 + 46 + func (f *FeedGenerator) AddToFeed(postURI string) { 47 + f.mu.Lock() 48 + defer f.mu.Unlock() 49 + // TODO: store this in DB instead 50 + f.feeds = append(f.feeds, postURI) 51 + }
+23 -7
main.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "errors" 5 6 "log/slog" 6 7 "os" 7 8 "os/signal" 8 9 "syscall" 10 + 11 + "github.com/gorilla/websocket" 9 12 ) 10 13 11 14 const ( ··· 33 36 34 37 enableJS := os.Getenv("ENABLE_JETSTREAM") 35 38 if enableJS == "true" { 36 - consumer := NewConsumer(jsServerAddr) 37 - go func() { 38 - err := consumer.Consume(ctx, slog.Default()) 39 - if err != nil { 40 - slog.Error("consume", "error", err) 41 - } 42 - }() 39 + go consumeLoop(ctx, jsServerAddr, feeder) 43 40 } 44 41 45 42 server := NewServer(443, feeder, feedHost, feedDidBase) ··· 51 48 52 49 server.Run() 53 50 } 51 + 52 + func consumeLoop(ctx context.Context, jsServerAddr string, feeder *FeedGenerator) { 53 + consumer := NewConsumer(jsServerAddr) 54 + for { 55 + err := consumer.Consume(ctx, feeder, slog.Default()) 56 + if err != nil { 57 + if errors.Is(err, context.Canceled) { 58 + return 59 + } 60 + 61 + if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) { 62 + slog.Error("consume - trying again", "error", err) 63 + continue 64 + } 65 + slog.Error("consume - exiting gracefully", "error", err) 66 + return 67 + } 68 + } 69 + }
+2 -2
server.go
··· 56 56 57 57 type FeedReponse struct { 58 58 Cursor string `json:"cursor"` 59 - Feed []FeedItme `json:"feed"` 59 + Feed []FeedItem `json:"feed"` 60 60 } 61 61 62 - type FeedItme struct { 62 + type FeedItem struct { 63 63 Post string `json:"post"` 64 64 FeedContext string `json:"feedContext"` 65 65 }