this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Update sonar a little bit (#625)

authored by

Jaz and committed by
GitHub
a2e24bb9 f50c8520

+23 -38
-14
cmd/sonar/docker-compose.yml
··· 8 8 restart: always 9 9 ports: 10 10 - "8345:8345" 11 - sonar-sandbox: 12 - build: 13 - context: ../../ 14 - dockerfile: cmd/sonar/Dockerfile 15 - image: atproto-sonar 16 - restart: always 17 - ports: 18 - - "8346:8345" 19 - command: 20 - [ 21 - "./sonar", 22 - "--ws-url", 23 - "wss://bgs.bsky-sandbox.dev/xrpc/com.atproto.sync.subscribeRepos", 24 - ]
+4 -13
cmd/sonar/main.go
··· 14 14 "time" 15 15 16 16 "github.com/bluesky-social/indigo/events" 17 - "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 17 + "github.com/bluesky-social/indigo/events/schedulers/sequential" 18 18 "github.com/bluesky-social/indigo/sonar" 19 19 "github.com/gorilla/websocket" 20 20 "github.com/prometheus/client_golang/prometheus/promhttp" ··· 35 35 &cli.StringFlag{ 36 36 Name: "ws-url", 37 37 Usage: "full websocket path to the ATProto SubscribeRepos XRPC endpoint", 38 - Value: "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos", 38 + Value: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", 39 39 }, 40 40 &cli.StringFlag{ 41 41 Name: "log-level", ··· 46 46 Name: "port", 47 47 Usage: "listen port for metrics server", 48 48 Value: 8345, 49 - }, 50 - &cli.IntFlag{ 51 - Name: "worker-count", 52 - Usage: "number of workers to process events", 53 - Value: 10, 54 49 }, 55 50 &cli.IntFlag{ 56 51 Name: "max-queue-size", ··· 102 97 103 98 wg := sync.WaitGroup{} 104 99 105 - scalingSettings := autoscaling.DefaultAutoscaleSettings() 106 - scalingSettings.MaxConcurrency = cctx.Int("worker-count") 107 - scalingSettings.AutoscaleFrequency = time.Second 108 - 109 - pool := autoscaling.NewScheduler(scalingSettings, u.Host, s.HandleStreamEvent) 100 + pool := sequential.NewScheduler(u.Host, s.HandleStreamEvent) 110 101 111 102 // Start a goroutine to manage the cursor file, saving the current cursor every 5 seconds. 112 103 go func() { ··· 191 182 192 183 logger.Info("connecting to WebSocket", "url", u.String()) 193 184 c, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{ 194 - "User-Agent": []string{"sonar/1.0"}, 185 + "User-Agent": []string{"sonar/1.1"}, 195 186 }) 196 187 if err != nil { 197 188 logger.Info("failed to connect to websocket", "err", err)
+1 -1
sonar/metrics.go
··· 19 19 var recordsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 20 20 Name: "sonar_records_processed_total", 21 21 Help: "The total number of records processed by Sonar", 22 - }, []string{"record_type", "socket_url"}) 22 + }, []string{"action", "socket_url", "record_type"}) 23 23 24 24 var quoteRepostsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 25 25 Name: "sonar_quote_reposts_processed_total",
+18 -10
sonar/sonar.go
··· 130 130 eventsProcessedCounter.WithLabelValues("identity", s.SocketURL).Inc() 131 131 now := time.Now() 132 132 s.ProgMux.Lock() 133 - s.Progress.LastSeq = xe.RepoHandle.Seq 133 + s.Progress.LastSeq = xe.RepoIdentity.Seq 134 134 s.Progress.LastSeqProcessedAt = now 135 135 s.ProgMux.Unlock() 136 136 case xe.RepoInfo != nil: ··· 226 226 break 227 227 } 228 228 229 + labelValues := []string{op.Action, s.SocketURL} 230 + 229 231 var recCreatedAt time.Time 230 232 var parseError error 231 233 232 234 // Unpack the record and process it 233 235 switch rec := rec.(type) { 234 236 case *bsky.FeedPost: 235 - recordsProcessedCounter.WithLabelValues("feed_post", s.SocketURL).Inc() 237 + labelValues = append(labelValues, "feed_post") 236 238 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 237 239 quoteRepostsProcessedCounter.WithLabelValues(s.SocketURL).Inc() 238 240 } 239 241 recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 240 242 case *bsky.FeedLike: 241 - recordsProcessedCounter.WithLabelValues("feed_like", s.SocketURL).Inc() 243 + labelValues = append(labelValues, "feed_like") 242 244 recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 243 245 case *bsky.FeedRepost: 244 - recordsProcessedCounter.WithLabelValues("feed_repost", s.SocketURL).Inc() 246 + labelValues = append(labelValues, "feed_repost") 245 247 recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 246 248 case *bsky.GraphBlock: 247 - recordsProcessedCounter.WithLabelValues("graph_block", s.SocketURL).Inc() 249 + labelValues = append(labelValues, "graph_block") 248 250 recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 249 251 case *bsky.GraphFollow: 250 - recordsProcessedCounter.WithLabelValues("graph_follow", s.SocketURL).Inc() 252 + labelValues = append(labelValues, "graph_follow") 251 253 recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 252 254 case *bsky.ActorProfile: 253 - recordsProcessedCounter.WithLabelValues("actor_profile", s.SocketURL).Inc() 255 + labelValues = append(labelValues, "actor_profile") 254 256 case *bsky.FeedGenerator: 255 - recordsProcessedCounter.WithLabelValues("feed_generator", s.SocketURL).Inc() 257 + labelValues = append(labelValues, "feed_generator") 256 258 recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 257 259 case *bsky.GraphList: 258 - recordsProcessedCounter.WithLabelValues("graph_list", s.SocketURL).Inc() 260 + labelValues = append(labelValues, "graph_list") 259 261 recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 260 262 case *bsky.GraphListitem: 261 - recordsProcessedCounter.WithLabelValues("graph_listitem", s.SocketURL).Inc() 263 + labelValues = append(labelValues, "graph_listitem") 264 + recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 265 + case *bsky.FeedThreadgate: 266 + labelValues = append(labelValues, "feed_threadgate") 262 267 recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt) 268 + case *bsky.LabelerService: 269 + labelValues = append(labelValues, "labeler_service") 263 270 default: 264 271 log.Warn("unknown record type", "rec", rec) 265 272 } 273 + recordsProcessedCounter.WithLabelValues(labelValues...).Inc() 266 274 if parseError != nil { 267 275 s.Logger.Error("error parsing time", "err", parseError) 268 276 continue