Select the types of activity you want to include in your feed.
Track socket connection URL in Sonar metrics (#221)
This should make it easier to filter dashboards between environments and
eventually allow for sonar to connect to multiple sockets at a time and
emit distinct metrics from each socket.
···99var eventsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
1010 Name: "sonar_events_processed_total",
1111 Help: "The total number of firehose events processed by Sonar",
1212-}, []string{"event_type"})
1212+}, []string{"event_type", "socket_url"})
13131414-var rebasesProcessedCounter = promauto.NewCounter(prometheus.CounterOpts{
1414+var rebasesProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
1515 Name: "sonar_rebases_processed_total",
1616 Help: "The total number of rebase operations processed by Sonar",
1717-})
1717+}, []string{"socket_url"})
18181919var recordsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
2020 Name: "sonar_records_processed_total",
2121 Help: "The total number of records processed by Sonar",
2222-}, []string{"record_type"})
2222+}, []string{"record_type", "socket_url"})
23232424-var quoteRepostsProcessedCounter = promauto.NewCounter(prometheus.CounterOpts{
2424+var quoteRepostsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
2525 Name: "sonar_quote_reposts_processed_total",
2626 Help: "The total number quote repost operations processed by Sonar",
2727-})
2727+}, []string{"socket_url"})
28282929var opsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
3030 Name: "sonar_ops_processed_total",
3131 Help: "The total number of repo operations processed by Sonar",
3232-}, []string{"kind", "op_path"})
3232+}, []string{"kind", "op_path", "socket_url"})
33333434// Initialize Prometheus metrics for duration of processing events
3535-var eventProcessingDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
3535+var eventProcessingDurationHistogram = promauto.NewHistogramVec(prometheus.HistogramOpts{
3636 Name: "sonar_event_processing_duration_seconds",
3737 Help: "The amount of time it takes to process a firehose event",
3838 Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
3939-})
3939+}, []string{"socket_url"})
40404141-var lastSeqGauge = promauto.NewGauge(prometheus.GaugeOpts{
4141+var lastSeqGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
4242 Name: "sonar_last_seq",
4343 Help: "The last sequence number processed",
4444-})
4444+}, []string{"socket_url"})
45454646-var lastSeqProcessedAtGauge = promauto.NewGauge(prometheus.GaugeOpts{
4646+var lastSeqProcessedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
4747 Name: "sonar_last_seq_processed_at",
4848 Help: "The timestamp of the last sequence number processed",
4949-})
4949+}, []string{"socket_url"})
50505151-var lastSeqCreatedAtGauge = promauto.NewGauge(prometheus.GaugeOpts{
5151+var lastSeqCreatedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
5252 Name: "sonar_last_seq_created_at",
5353 Help: "The timestamp of the last sequence number created",
5454-})
5454+}, []string{"socket_url"})
55555656-var lastSeqCommittedAtGauge = promauto.NewGauge(prometheus.GaugeOpts{
5656+var lastSeqCommittedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
5757 Name: "sonar_last_seq_committed_at",
5858 Help: "The commit timestamp of the last sequence number processed",
5959-})
5959+}, []string{"socket_url"})
+42-40
sonar/sonar.go
···2424)
25252626type Sonar struct {
2727+ SocketURL string
2728 Progress *Progress
2829 ProgMux sync.Mutex
2930 Logger *zap.SugaredLogger
···7172 return nil
7273}
73747474-func NewSonar(logger *zap.SugaredLogger, cursorFile string) (*Sonar, error) {
7575+func NewSonar(logger *zap.SugaredLogger, cursorFile string, socketURL string) (*Sonar, error) {
7576 s := Sonar{
7777+ SocketURL: socketURL,
7678 Progress: &Progress{
7779 LastSeq: -1,
7880 },
···106108107109 switch {
108110 case xe.RepoCommit != nil:
109109- eventsProcessedCounter.WithLabelValues("repo_commit").Inc()
111111+ eventsProcessedCounter.WithLabelValues("repo_commit", s.SocketURL).Inc()
110112 return s.HandleRepoCommit(ctx, xe.RepoCommit)
111113 case xe.RepoHandle != nil:
112112- eventsProcessedCounter.WithLabelValues("repo_handle").Inc()
114114+ eventsProcessedCounter.WithLabelValues("repo_handle", s.SocketURL).Inc()
113115 now := time.Now()
114116 s.ProgMux.Lock()
115117 s.Progress.LastSeq = xe.RepoHandle.Seq
···121123 log.Errorf("error parsing time: %+v", err)
122124 return nil
123125 }
124124- lastSeqCommittedAtGauge.Set(float64(t.UnixNano()))
125125- lastSeqProcessedAtGauge.Set(float64(now.UnixNano()))
126126- lastSeqGauge.Set(float64(xe.RepoHandle.Seq))
126126+ lastSeqCommittedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
127127+ lastSeqProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
128128+ lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq))
127129 case xe.RepoInfo != nil:
128128- eventsProcessedCounter.WithLabelValues("repo_info").Inc()
130130+ eventsProcessedCounter.WithLabelValues("repo_info", s.SocketURL).Inc()
129131 case xe.RepoMigrate != nil:
130130- eventsProcessedCounter.WithLabelValues("repo_migrate").Inc()
132132+ eventsProcessedCounter.WithLabelValues("repo_migrate", s.SocketURL).Inc()
131133 now := time.Now()
132134 s.ProgMux.Lock()
133135 s.Progress.LastSeq = xe.RepoMigrate.Seq
···139141 log.Errorf("error parsing time: %+v", err)
140142 return nil
141143 }
142142- lastSeqCommittedAtGauge.Set(float64(t.UnixNano()))
143143- lastSeqProcessedAtGauge.Set(float64(now.UnixNano()))
144144- lastSeqGauge.Set(float64(xe.RepoHandle.Seq))
144144+ lastSeqCommittedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
145145+ lastSeqProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
146146+ lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq))
145147 case xe.RepoTombstone != nil:
146146- eventsProcessedCounter.WithLabelValues("repo_tombstone").Inc()
148148+ eventsProcessedCounter.WithLabelValues("repo_tombstone", s.SocketURL).Inc()
147149 case xe.LabelInfo != nil:
148148- eventsProcessedCounter.WithLabelValues("label_info").Inc()
150150+ eventsProcessedCounter.WithLabelValues("label_info", s.SocketURL).Inc()
149151 case xe.LabelLabels != nil:
150150- eventsProcessedCounter.WithLabelValues("label_labels").Inc()
152152+ eventsProcessedCounter.WithLabelValues("label_labels", s.SocketURL).Inc()
151153 case xe.Error != nil:
152152- eventsProcessedCounter.WithLabelValues("error").Inc()
154154+ eventsProcessedCounter.WithLabelValues("error", s.SocketURL).Inc()
153155 }
154156 return nil
155157}
···165167 s.Progress.LastSeqProcessedAt = start
166168 s.ProgMux.Unlock()
167169168168- lastSeqGauge.Set(float64(evt.Seq))
170170+ lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(evt.Seq))
169171170172 log := s.Logger.With("repo", evt.Repo, "seq", evt.Seq, "commit", evt.Commit)
171173···177179178180 if evt.Rebase {
179181 log.Debug("rebase")
180180- rebasesProcessedCounter.Inc()
182182+ rebasesProcessedCounter.WithLabelValues(s.SocketURL).Inc()
181183 }
182184183185 // Parse time from the event time string
···187189 return nil
188190 }
189191190190- lastSeqCommittedAtGauge.Set(float64(t.UnixNano()))
191191- lastSeqProcessedAtGauge.Set(float64(start.UnixNano()))
192192+ lastSeqCommittedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
193193+ lastSeqProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(start.UnixNano()))
192194193195 for _, op := range evt.Ops {
194196 collection := strings.Split(op.Path, "/")[0]
···196198 ek := repomgr.EventKind(op.Action)
197199 log = log.With("action", op.Action, "collection", collection)
198200199199- opsProcessedCounter.WithLabelValues(op.Action, collection).Inc()
201201+ opsProcessedCounter.WithLabelValues(op.Action, collection, s.SocketURL).Inc()
200202201203 switch ek {
202204 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
···218220 // Unpack the record and process it
219221 switch rec := rec.(type) {
220222 case *bsky.FeedPost:
221221- recordsProcessedCounter.WithLabelValues("feed_post").Inc()
223223+ recordsProcessedCounter.WithLabelValues("feed_post", s.SocketURL).Inc()
222224 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
223223- quoteRepostsProcessedCounter.Inc()
225225+ quoteRepostsProcessedCounter.WithLabelValues(s.SocketURL).Inc()
224226 }
225227 // Parse time from the event time string
226228 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
···228230 log.Errorf("error parsing time: %+v", err)
229231 continue
230232 }
231231- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
233233+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
232234 case *bsky.FeedLike:
233233- recordsProcessedCounter.WithLabelValues("feed_like").Inc()
235235+ recordsProcessedCounter.WithLabelValues("feed_like", s.SocketURL).Inc()
234236 // Parse time from the event time string
235237 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
236238 if err != nil {
237239 log.Errorf("error parsing time: %+v", err)
238240 continue
239241 }
240240- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
242242+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
241243 case *bsky.FeedRepost:
242242- recordsProcessedCounter.WithLabelValues("feed_repost").Inc()
244244+ recordsProcessedCounter.WithLabelValues("feed_repost", s.SocketURL).Inc()
243245 // Parse time from the event time string
244246 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
245247 if err != nil {
246248 log.Errorf("error parsing time: %+v", err)
247249 continue
248250 }
249249- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
251251+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
250252 case *bsky.GraphBlock:
251251- recordsProcessedCounter.WithLabelValues("graph_block").Inc()
253253+ recordsProcessedCounter.WithLabelValues("graph_block", s.SocketURL).Inc()
252254 // Parse time from the event time string
253255 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
254256 if err != nil {
255257 log.Errorf("error parsing time: %+v", err)
256258 continue
257259 }
258258- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
260260+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
259261 case *bsky.GraphFollow:
260260- recordsProcessedCounter.WithLabelValues("graph_follow").Inc()
262262+ recordsProcessedCounter.WithLabelValues("graph_follow", s.SocketURL).Inc()
261263 // Parse time from the event time string
262264 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
263265 if err != nil {
264266 log.Errorf("error parsing time: %+v", err)
265267 continue
266268 }
267267- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
269269+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
268270 case *bsky.ActorProfile:
269269- recordsProcessedCounter.WithLabelValues("actor_profile").Inc()
271271+ recordsProcessedCounter.WithLabelValues("actor_profile", s.SocketURL).Inc()
270272 // Parse time from the event time string
271273 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
272274 if err != nil {
273275 log.Errorf("error parsing time: %+v", err)
274276 continue
275277 }
276276- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
278278+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
277279 case *bsky.FeedGenerator:
278278- recordsProcessedCounter.WithLabelValues("feed_generator").Inc()
280280+ recordsProcessedCounter.WithLabelValues("feed_generator", s.SocketURL).Inc()
279281 // Parse time from the event time string
280282 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
281283 if err != nil {
282284 log.Errorf("error parsing time: %+v", err)
283285 continue
284286 }
285285- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
287287+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
286288 case *bsky.GraphList:
287287- recordsProcessedCounter.WithLabelValues("graph_list").Inc()
289289+ recordsProcessedCounter.WithLabelValues("graph_list", s.SocketURL).Inc()
288290 // Parse time from the event time string
289291 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
290292 if err != nil {
291293 log.Errorf("error parsing time: %+v", err)
292294 continue
293295 }
294294- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
296296+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
295297 case *bsky.GraphListitem:
296296- recordsProcessedCounter.WithLabelValues("graph_listitem").Inc()
298298+ recordsProcessedCounter.WithLabelValues("graph_listitem", s.SocketURL).Inc()
297299 // Parse time from the event time string
298300 recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
299301 if err != nil {
300302 log.Errorf("error parsing time: %+v", err)
301303 continue
302304 }
303303- lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano()))
305305+ lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
304306 default:
305307 log.Warnf("unknown record type: %+v", rec)
306308 }
···311313 }
312314 }
313315314314- eventProcessingDurationHistogram.Observe(time.Since(start).Seconds())
316316+ eventProcessingDurationHistogram.WithLabelValues(s.SocketURL).Observe(time.Since(start).Seconds())
315317 return nil
316318}