this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

tidy

dholms 424da82b 7657a100

+62 -43
+14 -24
nexus/firehose.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 - "log/slog" 7 6 "net/http" 8 7 "net/url" 9 8 ··· 17 16 "github.com/gorilla/websocket" 18 17 ) 19 18 20 - func (nexus *Nexus) SubscribeFirehose(ctx context.Context) error { 19 + func (n *Nexus) SubscribeFirehose(ctx context.Context) error { 21 20 relayHost := "https://bsky.network" 22 21 23 22 dialer := websocket.DefaultDialer ··· 40 39 41 40 rsc := &events.RepoStreamCallbacks{ 42 41 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 43 - return nexus.handleCommitEvent(ctx, evt) 42 + return n.handleCommitEvent(ctx, evt) 44 43 }, 45 44 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 46 45 return nil ··· 59 58 relayHost, 60 59 rsc.EventHandler, 61 60 ) 62 - slog.Info("starting firehose consumer", "relayHost", relayHost) 63 - err = events.HandleRepoStream(ctx, con, scheduler, nil) 64 - 65 - if err != nil { 66 - return err 67 - } 61 + n.logger.Info("starting firehose consumer", "relayHost", relayHost) 68 62 return events.HandleRepoStream(ctx, con, scheduler, nil) 69 63 } 70 64 71 - func (nexus *Nexus) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 72 - nexus.mu.RLock() 73 - exists := nexus.filterDids[evt.Repo] 74 - nexus.mu.RUnlock() 75 - 76 - if !exists { 65 + func (n *Nexus) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 66 + if !n.filter.Contains(evt.Repo) { 77 67 return nil 78 68 } 79 69 80 70 r, err := repo.VerifyCommitMessage(ctx, evt) 81 71 if err != nil { 82 - nexus.logger.Info("failed to verify commit", "did", evt.Repo, "err", err) 72 + n.logger.Info("failed to verify commit", "did", evt.Repo, "error", err) 83 73 return err 84 74 } 85 75 ··· 100 90 Action: "delete", 101 91 } 102 92 103 - if err := nexus.outbox.Send(outOp); err != nil { 93 + if err := n.outbox.Send(outOp); err != nil { 104 94 return err 105 95 } 106 96 107 - if err := nexus.db.Where("did = ? AND collection = ? AND rkey = ?", evt.Repo, collStr, rkeyStr).Delete(&models.RepoRecord{}).Error; err != nil { 108 - nexus.logger.Error("failed to delete repo record", "error", err, "did", evt.Repo, "path", op.Path) 97 + if err := n.db.Where("did = ? AND collection = ? AND rkey = ?", evt.Repo, collStr, rkeyStr).Delete(&models.RepoRecord{}).Error; err != nil { 98 + n.logger.Error("failed to delete repo record", "did", evt.Repo, "path", op.Path, "error", err) 109 99 } 110 100 continue 111 101 } ··· 130 120 Cid: cidStr, 131 121 } 132 122 133 - if err := nexus.outbox.Send(outOp); err != nil { 123 + if err := n.outbox.Send(outOp); err != nil { 134 124 return err 135 125 } 136 126 ··· 141 131 Cid: cidStr, 142 132 Rev: evt.Rev, 143 133 } 144 - if err := nexus.db.Save(&repoRecord).Error; err != nil { 145 - nexus.logger.Error("failed to save repo record", "error", err, "did", evt.Repo, "path", op.Path) 134 + if err := n.db.Save(&repoRecord).Error; err != nil { 135 + n.logger.Error("failed to save repo record", "did", evt.Repo, "path", op.Path, "error", err) 146 136 } 147 137 } 148 138 149 - if err := nexus.UpdateRepoState(evt.Repo, models.RepoStateActive, evt.Rev, ""); err != nil { 150 - nexus.logger.Error("failed to update rev", "did", evt.Repo, "error", err) 139 + if err := n.UpdateRepoState(evt.Repo, models.RepoStateActive, evt.Rev, ""); err != nil { 140 + n.logger.Error("failed to update rev", "did", evt.Repo, "error", err) 151 141 } 152 142 153 143 return nil
+2 -6
nexus/handlers.go
··· 21 21 } 22 22 23 23 func (n *Nexus) handleHealthcheck(c echo.Context) error { 24 - return c.JSON(200, map[string]string{ 24 + return c.JSON(http.StatusOK, map[string]string{ 25 25 "status": "ok", 26 26 }) 27 27 } ··· 63 63 return echo.NewHTTPError(http.StatusInternalServerError) 64 64 } 65 65 66 - n.mu.Lock() 67 - for _, did := range payload.DIDs { 68 - n.filterDids[did] = true 69 - } 70 - n.mu.Unlock() 66 + n.filter.AddBatch(payload.DIDs) 71 67 72 68 for _, did := range payload.DIDs { 73 69 n.queueBackfill(did)
+11 -13
nexus/nexus.go
··· 3 3 import ( 4 4 "context" 5 5 "log/slog" 6 - "sync" 7 6 "time" 8 7 9 8 "github.com/bluesky-social/indigo/atproto/identity" ··· 18 17 echo *echo.Echo 19 18 logger *slog.Logger 20 19 21 - filterDids map[string]bool // DID -> exists (for quick filtering) 22 - mu sync.RWMutex 23 - 24 - Dir identity.Directory 20 + filter *StringSet 21 + Dir identity.Directory 25 22 26 23 outbox *Outbox 27 24 backfillQueue *BackfillQueue ··· 65 62 echo: e, 66 63 logger: slog.Default().With("system", "nexus"), 67 64 68 - filterDids: make(map[string]bool), 69 - 70 - Dir: &cdir, 65 + filter: NewStringSet(), 66 + Dir: &cdir, 71 67 72 68 outbox: NewOutbox(db), 73 69 backfillQueue: NewBackfillQueue(), ··· 75 71 76 72 // run 50 backfill workers 77 73 for i := 0; i < 50; i++ { 78 - go n.runBackfillWorker(i) 74 + go n.runBackfillWorker(context.Background(), i) 79 75 } 80 76 81 77 err = n.LoadFilters() ··· 119 115 return err 120 116 } 121 117 118 + dids := make([]string, 0, len(filterDids)) 122 119 for _, f := range filterDids { 123 - n.filterDids[f.Did] = true 120 + dids = append(dids, f.Did) 124 121 125 122 if f.State == models.RepoStatePending || f.State == models.RepoStateBackfilling { 126 123 n.queueBackfill(f.Did) 127 124 } 128 125 } 129 126 127 + n.filter.AddBatch(dids) 130 128 return nil 131 129 } 132 130 133 - func (n *Nexus) runBackfillWorker(workerID int) { 131 + func (n *Nexus) runBackfillWorker(ctx context.Context, workerID int) { 134 132 logger := n.logger.With("worker", workerID) 135 133 136 134 for { 137 135 did := n.backfillQueue.Dequeue() 138 136 logger.Info("processing backfill", "did", did) 139 - err := n.backfillDid(context.Background(), did) 137 + err := n.backfillDid(ctx, did) 140 138 if err != nil { 141 - logger.Error("error backfilling did", "error", err, "did", did) 139 + logger.Error("backfill failed", "did", did, "error", err) 142 140 } 143 141 } 144 142 }
+35
nexus/stringset.go
··· 1 + package main 2 + 3 + import "sync" 4 + 5 + type StringSet struct { 6 + items map[string]bool 7 + mu sync.RWMutex 8 + } 9 + 10 + func NewStringSet() *StringSet { 11 + return &StringSet{ 12 + items: make(map[string]bool), 13 + } 14 + } 15 + 16 + func (s *StringSet) Add(item string) { 17 + s.mu.Lock() 18 + s.items[item] = true 19 + s.mu.Unlock() 20 + } 21 + 22 + func (s *StringSet) AddBatch(items []string) { 23 + s.mu.Lock() 24 + for _, item := range items { 25 + s.items[item] = true 26 + } 27 + s.mu.Unlock() 28 + } 29 + 30 + func (s *StringSet) Contains(item string) bool { 31 + s.mu.RLock() 32 + exists := s.items[item] 33 + s.mu.RUnlock() 34 + return exists 35 + }