···40404141var lastSeqGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
4242 Name: "sonar_last_seq",
4343- Help: "The last sequence number processed",
4343+ Help: "The sequence number of the last event processed",
4444+}, []string{"socket_url"})
4545+4646+var lastEvtProcessedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
4747+ Name: "sonar_last_evt_processed_at",
4848+ Help: "The timestamp of the last event processed",
4449}, []string{"socket_url"})
45504646-var lastSeqProcessedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
4747- Name: "sonar_last_seq_processed_at",
4848- Help: "The timestamp of the last sequence number processed",
5151+var lastEvtCreatedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
5252+ Name: "sonar_last_evt_created_at",
5353+ Help: "The timestamp of the last event created",
5454+}, []string{"socket_url"})
5555+5656+var lastRecordCreatedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
5757+ Name: "sonar_last_record_created_at",
5858+ Help: "The timestamp of the last record processed",
5959+}, []string{"socket_url"})
6060+6161+var lastEvtCreatedRecordCreatedGapGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
6262+ Name: "sonar_last_evt_created_record_created_gap",
6363+ Help: "The gap between the last event's event timestamp and it's record timestamp",
4964}, []string{"socket_url"})
50655151-var lastSeqCreatedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
5252- Name: "sonar_last_seq_created_at",
5353- Help: "The timestamp of the last sequence number created",
6666+var lastEvtCreatedEvtProcessedGapGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
6767+ Name: "sonar_last_evt_created_evt_processed_gap",
6868+ Help: "The gap between the last event's event timestamp and when it was processed by sonar",
5469}, []string{"socket_url"})
55705656-var lastSeqCommittedAtGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
5757- Name: "sonar_last_seq_committed_at",
5858- Help: "The commit timestamp of the last sequence number processed",
7171+var lastRecordCreatedEvtProcessedGapGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
7272+ Name: "sonar_last_record_created_evt_processed_gap",
7373+ Help: "The gap between the last record's record timestamp and when it was processed by sonar",
5974}, []string{"socket_url"})
+37-77
sonar/sonar.go
···99 "sync"
1010 "time"
11111212+ "github.com/araddon/dateparse"
1213 comatproto "github.com/bluesky-social/indigo/api/atproto"
1314 "github.com/bluesky-social/indigo/api/bsky"
1415 lexutil "github.com/bluesky-social/indigo/lex/util"
1515- "github.com/bluesky-social/indigo/util"
1616 "github.com/goccy/go-json"
1717 "github.com/labstack/gommon/log"
1818···123123 log.Errorf("error parsing time: %+v", err)
124124 return nil
125125 }
126126- lastSeqCommittedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
127127- lastSeqProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
126126+ lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
127127+ lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
128128+ lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(now.Sub(t).Seconds()))
128129 lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq))
129130 case xe.RepoInfo != nil:
130131 eventsProcessedCounter.WithLabelValues("repo_info", s.SocketURL).Inc()
···141142 log.Errorf("error parsing time: %+v", err)
142143 return nil
143144 }
144144- lastSeqCommittedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
145145- lastSeqProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
145145+ lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
146146+ lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
147147+ lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(now.Sub(t).Seconds()))
146148 lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq))
147149 case xe.RepoTombstone != nil:
148150 eventsProcessedCounter.WithLabelValues("repo_tombstone", s.SocketURL).Inc()
···160162 ctx, span := otel.Tracer("sonar").Start(ctx, "HandleRepoCommit")
161163 defer span.End()
162164163163- start := time.Now()
165165+ processedAt := time.Now()
164166165167 s.ProgMux.Lock()
166168 s.Progress.LastSeq = evt.Seq
167167- s.Progress.LastSeqProcessedAt = start
169169+ s.Progress.LastSeqProcessedAt = processedAt
168170 s.ProgMux.Unlock()
169171170172 lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(evt.Seq))
···173175174176 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
175177 if err != nil {
176176- log.Errorf("failed to read repo from car: %+v\n", err)
178178+ log.Errorf("failed to read repo from car: %+v", err)
177179 return nil
178180 }
179181···183185 }
184186185187 // Parse time from the event time string
186186- t, err := time.Parse(time.RFC3339, evt.Time)
188188+ evtCreatedAt, err := time.Parse(time.RFC3339, evt.Time)
187189 if err != nil {
188190 log.Errorf("error parsing time: %+v", err)
189191 return nil
190192 }
191193192192- lastSeqCommittedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
193193- lastSeqProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(start.UnixNano()))
194194+ lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(evtCreatedAt.UnixNano()))
195195+ lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(processedAt.UnixNano()))
196196+ lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(processedAt.Sub(evtCreatedAt).Seconds()))
194197195198 for _, op := range evt.Ops {
196199 collection := strings.Split(op.Path, "/")[0]
···206209 rc, rec, err := rr.GetRecord(ctx, op.Path)
207210 if err != nil {
208211 e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
209209- log.Errorf("failed to get a record from the event: %+v\n", e)
212212+ log.Errorf("failed to get a record from the event: %+v", e)
210213 break
211214 }
212215213216 // Verify that the record cid matches the cid in the event
214217 if lexutil.LexLink(rc) != *op.Cid {
215218 e := fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
216216- log.Errorf("failed to LexLink the record in the event: %+v\n", e)
219219+ log.Errorf("failed to LexLink the record in the event: %+v", e)
217220 break
218221 }
219222223223+ var recCreatedAt time.Time
224224+ var parseError error
225225+220226 // Unpack the record and process it
221227 switch rec := rec.(type) {
222228 case *bsky.FeedPost:
···224230 if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil {
225231 quoteRepostsProcessedCounter.WithLabelValues(s.SocketURL).Inc()
226232 }
227227- // Parse time from the event time string
228228- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
229229- if err != nil {
230230- log.Errorf("error parsing time: %+v", err)
231231- continue
232232- }
233233- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
233233+ recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt)
234234 case *bsky.FeedLike:
235235 recordsProcessedCounter.WithLabelValues("feed_like", s.SocketURL).Inc()
236236- // Parse time from the event time string
237237- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
238238- if err != nil {
239239- log.Errorf("error parsing time: %+v", err)
240240- continue
241241- }
242242- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
236236+ recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt)
243237 case *bsky.FeedRepost:
244238 recordsProcessedCounter.WithLabelValues("feed_repost", s.SocketURL).Inc()
245245- // Parse time from the event time string
246246- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
247247- if err != nil {
248248- log.Errorf("error parsing time: %+v", err)
249249- continue
250250- }
251251- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
239239+ recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt)
252240 case *bsky.GraphBlock:
253241 recordsProcessedCounter.WithLabelValues("graph_block", s.SocketURL).Inc()
254254- // Parse time from the event time string
255255- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
256256- if err != nil {
257257- log.Errorf("error parsing time: %+v", err)
258258- continue
259259- }
260260- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
242242+ recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt)
261243 case *bsky.GraphFollow:
262244 recordsProcessedCounter.WithLabelValues("graph_follow", s.SocketURL).Inc()
263263- // Parse time from the event time string
264264- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
265265- if err != nil {
266266- log.Errorf("error parsing time: %+v", err)
267267- continue
268268- }
269269- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
245245+ recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt)
270246 case *bsky.ActorProfile:
271247 recordsProcessedCounter.WithLabelValues("actor_profile", s.SocketURL).Inc()
272272- // Parse time from the event time string
273273- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
274274- if err != nil {
275275- log.Errorf("error parsing time: %+v", err)
276276- continue
277277- }
278278- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
279248 case *bsky.FeedGenerator:
280249 recordsProcessedCounter.WithLabelValues("feed_generator", s.SocketURL).Inc()
281281- // Parse time from the event time string
282282- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
283283- if err != nil {
284284- log.Errorf("error parsing time: %+v", err)
285285- continue
286286- }
287287- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
250250+ recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt)
288251 case *bsky.GraphList:
289252 recordsProcessedCounter.WithLabelValues("graph_list", s.SocketURL).Inc()
290290- // Parse time from the event time string
291291- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
292292- if err != nil {
293293- log.Errorf("error parsing time: %+v", err)
294294- continue
295295- }
296296- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
253253+ recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt)
297254 case *bsky.GraphListitem:
298255 recordsProcessedCounter.WithLabelValues("graph_listitem", s.SocketURL).Inc()
299299- // Parse time from the event time string
300300- recCreatedAt, err := time.Parse(util.ISO8601, evt.Time)
301301- if err != nil {
302302- log.Errorf("error parsing time: %+v", err)
303303- continue
304304- }
305305- lastSeqCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
256256+ recCreatedAt, parseError = dateparse.ParseAny(rec.CreatedAt)
306257 default:
307258 log.Warnf("unknown record type: %+v", rec)
308259 }
260260+ if parseError != nil {
261261+ log.Errorf("error parsing time: %+v", parseError)
262262+ continue
263263+ }
264264+ if !recCreatedAt.IsZero() {
265265+ lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(recCreatedAt.UnixNano()))
266266+ lastEvtCreatedRecordCreatedGapGauge.WithLabelValues(s.SocketURL).Set(float64(evtCreatedAt.Sub(recCreatedAt).Seconds()))
267267+ lastRecordCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(processedAt.Sub(recCreatedAt).Seconds()))
268268+ }
309269310270 case repomgr.EvtKindDeleteRecord:
311271 default:
···313273 }
314274 }
315275316316- eventProcessingDurationHistogram.WithLabelValues(s.SocketURL).Observe(time.Since(start).Seconds())
276276+ eventProcessingDurationHistogram.WithLabelValues(s.SocketURL).Observe(time.Since(processedAt).Seconds())
317277 return nil
318278}