this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Create Sonar, an ATProto RepoSync Stats Generator (#217)

Sonar is an ATProto SubscribeRepos firehose listener that emits
Prometheus metrics to characterize the kind and volume of content moving
through the firehose.

authored by

Jaz and committed by
GitHub
80c8a4f7 c51808d8

+662 -5
+3
.gitignore
··· 36 36 37 37 # Don't commit your (default location) creds 38 38 bsky.auth 39 + 40 + # Sonar cursor file 41 + sonar_cursor.json
+5
Makefile
··· 21 21 go build ./cmd/stress 22 22 go build ./cmd/fakermaker 23 23 go build ./cmd/labelmaker 24 + go build -o ./sonar-cli ./cmd/sonar 24 25 25 26 .PHONY: all 26 27 all: build ··· 74 75 .PHONY: run-dev-search 75 76 run-dev-search: .env ## Runs search daemon for local dev 76 77 GOLOG_LOG_LEVEL=info go run ./cmd/palomar run 78 + 79 + .PHONY: sonar-up 80 + sonar-up: # Runs sonar docker container 81 + docker compose -f cmd/sonar/docker-compose.yml up --build -d || docker-compose -f cmd/sonar/docker-compose.yml up --build -d
+29
cmd/sonar/Dockerfile
··· 1 + # Stage 1: Build the Go binary 2 + FROM golang:1.20.5-alpine AS builder 3 + 4 + # Install SSL ca certificates. 5 + RUN apk update && apk add --no-cache ca-certificates && update-ca-certificates 6 + 7 + # Create a directory for the application 8 + WORKDIR /app 9 + 10 + # Fetch dependencies 11 + COPY go.mod go.sum ./ 12 + RUN go mod download 13 + 14 + COPY . . 15 + 16 + # Build the application 17 + RUN CGO_ENABLED=0 GOOS=linux go build -o /sonar ./cmd/sonar 18 + 19 + # Stage 2: Build a minimal Docker image 20 + FROM scratch 21 + 22 + # Import the SSL certificates from the first stage. 23 + COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt 24 + 25 + # Copy the binary from the first stage. 26 + COPY --from=builder /sonar /sonar 27 + 28 + # Set the startup command to run the binary 29 + CMD ["/sonar"]
+10
cmd/sonar/docker-compose.yml
··· 1 + version: "3.8" 2 + services: 3 + sonar: 4 + build: 5 + context: ../../ 6 + dockerfile: cmd/sonar/Dockerfile 7 + image: atproto-sonar 8 + restart: always 9 + ports: 10 + - "8345:8345"
+240
cmd/sonar/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log" 7 + "net/http" 8 + "net/url" 9 + "os" 10 + "os/signal" 11 + "sync" 12 + "syscall" 13 + "time" 14 + 15 + "github.com/bluesky-social/indigo/events" 16 + "github.com/bluesky-social/indigo/sonar" 17 + "github.com/bluesky-social/indigo/version" 18 + "github.com/gorilla/websocket" 19 + "github.com/prometheus/client_golang/prometheus/promhttp" 20 + "go.uber.org/zap" 21 + 22 + "github.com/urfave/cli/v2" 23 + ) 24 + 25 + func main() { 26 + app := cli.App{ 27 + Name: "sonar", 28 + Usage: "atproto firehose monitoring tool", 29 + Version: version.Version, 30 + } 31 + 32 + app.Flags = []cli.Flag{ 33 + &cli.StringFlag{ 34 + Name: "ws-url", 35 + Usage: "full websocket path to the ATProto SubscribeRepos XRPC endpoint", 36 + Value: "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos", 37 + }, 38 + &cli.StringFlag{ 39 + Name: "log-level", 40 + Usage: "log level", 41 + Value: "info", 42 + }, 43 + &cli.IntFlag{ 44 + Name: "port", 45 + Usage: "listen port for metrics server", 46 + Value: 8345, 47 + }, 48 + &cli.IntFlag{ 49 + Name: "worker-count", 50 + Usage: "number of workers to process events", 51 + Value: 10, 52 + }, 53 + &cli.IntFlag{ 54 + Name: "max-queue-size", 55 + Usage: "max number of events to queue", 56 + Value: 10, 57 + }, 58 + &cli.StringFlag{ 59 + Name: "cursor-file", 60 + Usage: "path to cursor file", 61 + Value: "sonar_cursor.json", 62 + }, 63 + } 64 + 65 + app.Action = Sonar 66 + 67 + err := app.Run(os.Args) 68 + if err != nil { 69 + log.Fatal(err) 70 + } 71 + } 72 + 73 + func Sonar(cctx *cli.Context) error { 74 + ctx := cctx.Context 75 + ctx, cancel := context.WithCancel(ctx) 76 + defer cancel() 77 + 78 + // Trap SIGINT to trigger a shutdown. 79 + signals := make(chan os.Signal, 1) 80 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 81 + 82 + rawlog, err := zap.NewProduction() 83 + if err != nil { 84 + log.Fatalf("failed to create logger: %+v\n", err) 85 + } 86 + defer func() { 87 + log.Printf("main function teardown\n") 88 + err := rawlog.Sync() 89 + if err != nil { 90 + log.Printf("failed to sync logger on teardown: %+v", err.Error()) 91 + } 92 + }() 93 + 94 + log := rawlog.Sugar().With("source", "sonar_main") 95 + 96 + log.Info("starting sonar") 97 + 98 + u, err := url.Parse(cctx.String("ws-url")) 99 + if err != nil { 100 + log.Fatalf("failed to parse ws-url: %+v\n", err) 101 + } 102 + 103 + s, err := sonar.NewSonar(log, cctx.String("cursor-file")) 104 + if err != nil { 105 + log.Fatalf("failed to create sonar: %+v\n", err) 106 + } 107 + 108 + wg := sync.WaitGroup{} 109 + 110 + pool := events.NewConsumerPool(cctx.Int("worker-count"), cctx.Int("max-queue-size"), s.HandleStreamEvent) 111 + 112 + // Start a goroutine to manage the cursor file, saving the current cursor every 5 seconds. 113 + go func() { 114 + wg.Add(1) 115 + defer wg.Done() 116 + ticker := time.NewTicker(5 * time.Second) 117 + rawlog, err := zap.NewProduction() 118 + if err != nil { 119 + log.Fatalf("failed to create logger: %+v\n", err) 120 + } 121 + log := rawlog.Sugar().With("source", "cursor_file_manager") 122 + 123 + for { 124 + select { 125 + case <-ctx.Done(): 126 + log.Info("shutting down cursor file manager") 127 + err := s.WriteCursorFile() 128 + if err != nil { 129 + log.Errorf("failed to write cursor file: %+v\n", err) 130 + } 131 + log.Info("cursor file manager shut down successfully") 132 + return 133 + case <-ticker.C: 134 + err := s.WriteCursorFile() 135 + if err != nil { 136 + log.Errorf("failed to write cursor file: %+v\n", err) 137 + } 138 + } 139 + } 140 + }() 141 + 142 + // Start a goroutine to manage the liveness checker, shutting down if no events are received for 15 seconds 143 + go func() { 144 + wg.Add(1) 145 + defer wg.Done() 146 + ticker := time.NewTicker(15 * time.Second) 147 + lastSeq := int64(0) 148 + 149 + rawlog, err := zap.NewProduction() 150 + if err != nil { 151 + log.Fatalf("failed to create logger: %+v\n", err) 152 + } 153 + log := rawlog.Sugar().With("source", "liveness_checker") 154 + 155 + for { 156 + select { 157 + case <-ctx.Done(): 158 + log.Info("shutting down liveness checker") 159 + return 160 + case <-ticker.C: 161 + s.ProgMux.Lock() 162 + seq := s.Progress.LastSeq 163 + s.ProgMux.Unlock() 164 + if seq <= lastSeq { 165 + log.Errorf("no new events in last 15 seconds, shutting down for docker to restart me") 166 + cancel() 167 + } else { 168 + log.Infof("last event sequence: %d", seq) 169 + lastSeq = seq 170 + } 171 + } 172 + } 173 + }() 174 + 175 + mux := http.NewServeMux() 176 + mux.Handle("/metrics", promhttp.Handler()) 177 + 178 + metricServer := &http.Server{ 179 + Addr: fmt.Sprintf(":%d", cctx.Int("port")), 180 + Handler: mux, 181 + } 182 + 183 + // Startup metrics server 184 + go func() { 185 + wg.Add(1) 186 + defer wg.Done() 187 + rawlog, err := zap.NewProduction() 188 + if err != nil { 189 + log.Fatalf("failed to create logger: %+v\n", err) 190 + } 191 + log := rawlog.Sugar().With("source", "metrics_server") 192 + 193 + log.Infof("metrics server listening on port %d", cctx.Int("port")) 194 + 195 + if err := metricServer.ListenAndServe(); err != http.ErrServerClosed { 196 + log.Fatalf("failed to start metrics server: %+v\n", err) 197 + } 198 + log.Info("metrics server shut down successfully") 199 + }() 200 + 201 + if s.Progress.LastSeq >= 0 { 202 + u.RawQuery = fmt.Sprintf("cursor=%d", s.Progress.LastSeq) 203 + } 204 + 205 + log.Infof("connecting to WebSocket at: %s", u.String()) 206 + c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) 207 + if err != nil { 208 + log.Infof("failed to connect to websocket: %v", err) 209 + return err 210 + } 211 + defer c.Close() 212 + 213 + go func() { 214 + wg.Add(1) 215 + defer wg.Done() 216 + err = events.HandleRepoStream(ctx, c, pool) 217 + log.Infof("HandleRepoStream returned unexpectedly: %+v...", err) 218 + cancel() 219 + }() 220 + 221 + select { 222 + case <-signals: 223 + cancel() 224 + fmt.Println("shutting down on signal") 225 + case <-ctx.Done(): 226 + fmt.Println("shutting down on context done") 227 + } 228 + 229 + log.Info("shutting down, waiting for workers to clean up...") 230 + 231 + if err := metricServer.Shutdown(ctx); err != nil { 232 + log.Errorf("failed to shut down metrics server: %+v\n", err) 233 + wg.Done() 234 + } 235 + 236 + wg.Wait() 237 + log.Info("shut down successfully") 238 + 239 + return nil 240 + }
-4
indexer/indexer.go
··· 155 155 return fmt.Errorf("handle recordCreate: %w", err) 156 156 } 157 157 } 158 - case repomgr.EvtKindInitActor: 159 - if err := ix.handleInitActor(ctx, evt, op); err != nil { 160 - return fmt.Errorf("handle initActor: %w", err) 161 - } 162 158 case repomgr.EvtKindDeleteRecord: 163 159 if ix.doAggregations { 164 160 if err := ix.handleRecordDelete(ctx, evt, op, true); err != nil {
-1
repomgr/repomgr.go
··· 98 98 EvtKindCreateRecord = EventKind("create") 99 99 EvtKindUpdateRecord = EventKind("update") 100 100 EvtKindDeleteRecord = EventKind("delete") 101 - EvtKindInitActor = EventKind("init") 102 101 ) 103 102 104 103 type RepoHead struct {
+59
sonar/metrics.go
··· 1 + package sonar 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // Initialize Prometheus Metrics for total number of posts processed 9 + var eventsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 10 + Name: "sonar_events_processed_total", 11 + Help: "The total number of firehose events processed by Sonar", 12 + }, []string{"event_type"}) 13 + 14 + var rebasesProcessedCounter = promauto.NewCounter(prometheus.CounterOpts{ 15 + Name: "sonar_rebases_processed_total", 16 + Help: "The total number of rebase operations processed by Sonar", 17 + }) 18 + 19 + var recordsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 20 + Name: "sonar_records_processed_total", 21 + Help: "The total number of records processed by Sonar", 22 + }, []string{"record_type"}) 23 + 24 + var quoteRepostsProcessedCounter = promauto.NewCounter(prometheus.CounterOpts{ 25 + Name: "sonar_quote_reposts_processed_total", 26 + Help: "The total number quote repost operations processed by Sonar", 27 + }) 28 + 29 + var opsProcessedCounter = promauto.NewCounterVec(prometheus.CounterOpts{ 30 + Name: "sonar_ops_processed_total", 31 + Help: "The total number of repo operations processed by Sonar", 32 + }, []string{"kind", "op_path"}) 33 + 34 + // Initialize Prometheus metrics for duration of processing events 35 + var eventProcessingDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ 36 + Name: "sonar_event_processing_duration_seconds", 37 + Help: "The amount of time it takes to process a firehose event", 38 + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), 39 + }) 40 + 41 + var lastSeqGauge = promauto.NewGauge(prometheus.GaugeOpts{ 42 + Name: "sonar_last_seq", 43 + Help: "The last sequence number processed", 44 + }) 45 + 46 + var lastSeqProcessedAtGauge = promauto.NewGauge(prometheus.GaugeOpts{ 47 + Name: "sonar_last_seq_processed_at", 48 + Help: "The timestamp of the last sequence number processed", 49 + }) 50 + 51 + var lastSeqCreatedAtGauge = promauto.NewGauge(prometheus.GaugeOpts{ 52 + Name: "sonar_last_seq_created_at", 53 + Help: "The timestamp of the last sequence number created", 54 + }) 55 + 56 + var lastSeqCommittedAtGauge = promauto.NewGauge(prometheus.GaugeOpts{ 57 + Name: "sonar_last_seq_committed_at", 58 + Help: "The commit timestamp of the last sequence number processed", 59 + })
+316
sonar/sonar.go
··· 1 + package sonar 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "os" 8 + "strings" 9 + "sync" 10 + "time" 11 + 12 + comatproto "github.com/bluesky-social/indigo/api/atproto" 13 + "github.com/bluesky-social/indigo/api/bsky" 14 + lexutil "github.com/bluesky-social/indigo/lex/util" 15 + "github.com/bluesky-social/indigo/util" 16 + "github.com/goccy/go-json" 17 + "github.com/labstack/gommon/log" 18 + 19 + "github.com/bluesky-social/indigo/events" 20 + "github.com/bluesky-social/indigo/repo" 21 + "github.com/bluesky-social/indigo/repomgr" 22 + "go.opentelemetry.io/otel" 23 + "go.uber.org/zap" 24 + ) 25 + 26 + type Sonar struct { 27 + Progress *Progress 28 + ProgMux sync.Mutex 29 + Logger *zap.SugaredLogger 30 + CursorFile string 31 + } 32 + 33 + type Progress struct { 34 + LastSeq int64 `json:"last_seq"` 35 + LastSeqProcessedAt time.Time `json:"last_seq_processed_at"` 36 + } 37 + 38 + func (s *Sonar) WriteCursorFile() error { 39 + // Marshal the cursor file 40 + s.ProgMux.Lock() 41 + data, err := json.Marshal(s.Progress) 42 + s.ProgMux.Unlock() 43 + if err != nil { 44 + return fmt.Errorf("failed to marshal cursor file: %+v", err) 45 + } 46 + 47 + // Write the cursor file 48 + err = os.WriteFile(s.CursorFile, data, 0644) 49 + if err != nil { 50 + return fmt.Errorf("failed to write cursor file: %+v", err) 51 + } 52 + 53 + return nil 54 + } 55 + 56 + func (s *Sonar) ReadCursorFile() error { 57 + // Read the cursor file 58 + data, err := os.ReadFile(s.CursorFile) 59 + if err != nil { 60 + return fmt.Errorf("failed to read cursor file: %+v", err) 61 + } 62 + 63 + // Unmarshal the cursor file 64 + s.ProgMux.Lock() 65 + err = json.Unmarshal(data, s.Progress) 66 + s.ProgMux.Unlock() 67 + if err != nil { 68 + return fmt.Errorf("failed to unmarshal cursor file: %+v", err) 69 + } 70 + 71 + return nil 72 + } 73 + 74 + func NewSonar(logger *zap.SugaredLogger, cursorFile string) (*Sonar, error) { 75 + s := Sonar{ 76 + Progress: &Progress{ 77 + LastSeq: -1, 78 + }, 79 + Logger: logger, 80 + ProgMux: sync.Mutex{}, 81 + CursorFile: cursorFile, 82 + } 83 + 84 + // Check to see if the cursor file exists 85 + if _, err := os.Stat(cursorFile); os.IsNotExist(err) { 86 + logger.Infof("cursor file does not exist, creating %s", cursorFile) 87 + // Create the cursor file 88 + err := s.WriteCursorFile() 89 + if err != nil { 90 + return nil, fmt.Errorf("failed to write cursor file: %+v", err) 91 + } 92 + } else { 93 + // Read the cursor file 94 + err := s.ReadCursorFile() 95 + if err != nil { 96 + logger.Errorf("read cursor file, will start drinking from live: %+v", err.Error()) 97 + } 98 + } 99 + 100 + return &s, nil 101 + } 102 + 103 + func (s *Sonar) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamEvent) error { 104 + ctx, span := otel.Tracer("sonar").Start(ctx, "HandleStreamEvent") 105 + defer span.End() 106 + 107 + switch { 108 + case xe.RepoCommit != nil: 109 + eventsProcessedCounter.WithLabelValues("repo_commit").Inc() 110 + return s.HandleRepoCommit(ctx, xe.RepoCommit) 111 + case xe.RepoHandle != nil: 112 + eventsProcessedCounter.WithLabelValues("repo_handle").Inc() 113 + now := time.Now() 114 + s.ProgMux.Lock() 115 + s.Progress.LastSeq = xe.RepoHandle.Seq 116 + s.Progress.LastSeqProcessedAt = now 117 + s.ProgMux.Unlock() 118 + // Parse time from the event time string 119 + t, err := time.Parse(time.RFC3339, xe.RepoHandle.Time) 120 + if err != nil { 121 + log.Errorf("error parsing time: %+v", err) 122 + return nil 123 + } 124 + lastSeqCommittedAtGauge.Set(float64(t.UnixNano())) 125 + lastSeqProcessedAtGauge.Set(float64(now.UnixNano())) 126 + lastSeqGauge.Set(float64(xe.RepoHandle.Seq)) 127 + case xe.RepoInfo != nil: 128 + eventsProcessedCounter.WithLabelValues("repo_info").Inc() 129 + case xe.RepoMigrate != nil: 130 + eventsProcessedCounter.WithLabelValues("repo_migrate").Inc() 131 + now := time.Now() 132 + s.ProgMux.Lock() 133 + s.Progress.LastSeq = xe.RepoMigrate.Seq 134 + s.Progress.LastSeqProcessedAt = time.Now() 135 + s.ProgMux.Unlock() 136 + // Parse time from the event time string 137 + t, err := time.Parse(time.RFC3339, xe.RepoMigrate.Time) 138 + if err != nil { 139 + log.Errorf("error parsing time: %+v", err) 140 + return nil 141 + } 142 + lastSeqCommittedAtGauge.Set(float64(t.UnixNano())) 143 + lastSeqProcessedAtGauge.Set(float64(now.UnixNano())) 144 + lastSeqGauge.Set(float64(xe.RepoHandle.Seq)) 145 + case xe.RepoTombstone != nil: 146 + eventsProcessedCounter.WithLabelValues("repo_tombstone").Inc() 147 + case xe.LabelInfo != nil: 148 + eventsProcessedCounter.WithLabelValues("label_info").Inc() 149 + case xe.LabelLabels != nil: 150 + eventsProcessedCounter.WithLabelValues("label_labels").Inc() 151 + case xe.Error != nil: 152 + eventsProcessedCounter.WithLabelValues("error").Inc() 153 + } 154 + return nil 155 + } 156 + 157 + func (s *Sonar) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 158 + ctx, span := otel.Tracer("sonar").Start(ctx, "HandleRepoCommit") 159 + defer span.End() 160 + 161 + start := time.Now() 162 + 163 + s.ProgMux.Lock() 164 + s.Progress.LastSeq = evt.Seq 165 + s.Progress.LastSeqProcessedAt = start 166 + s.ProgMux.Unlock() 167 + 168 + lastSeqGauge.Set(float64(evt.Seq)) 169 + 170 + log := s.Logger.With("repo", evt.Repo, "seq", evt.Seq, "commit", evt.Commit) 171 + 172 + rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 173 + if err != nil { 174 + log.Errorf("failed to read repo from car: %+v\n", err) 175 + return nil 176 + } 177 + 178 + if evt.Rebase { 179 + log.Debug("rebase") 180 + rebasesProcessedCounter.Inc() 181 + } 182 + 183 + // Parse time from the event time string 184 + t, err := time.Parse(time.RFC3339, evt.Time) 185 + if err != nil { 186 + log.Errorf("error parsing time: %+v", err) 187 + return nil 188 + } 189 + 190 + lastSeqCommittedAtGauge.Set(float64(t.UnixNano())) 191 + lastSeqProcessedAtGauge.Set(float64(start.UnixNano())) 192 + 193 + for _, op := range evt.Ops { 194 + collection := strings.Split(op.Path, "/")[0] 195 + 196 + ek := repomgr.EventKind(op.Action) 197 + log = log.With("action", op.Action, "collection", collection) 198 + 199 + opsProcessedCounter.WithLabelValues(op.Action, collection).Inc() 200 + 201 + switch ek { 202 + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 203 + // Grab the record from the merkel tree 204 + rc, rec, err := rr.GetRecord(ctx, op.Path) 205 + if err != nil { 206 + e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err) 207 + log.Errorf("failed to get a record from the event: %+v\n", e) 208 + break 209 + } 210 + 211 + // Verify that the record cid matches the cid in the event 212 + if lexutil.LexLink(rc) != *op.Cid { 213 + e := fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid) 214 + log.Errorf("failed to LexLink the record in the event: %+v\n", e) 215 + break 216 + } 217 + 218 + // Unpack the record and process it 219 + switch rec := rec.(type) { 220 + case *bsky.FeedPost: 221 + recordsProcessedCounter.WithLabelValues("feed_post").Inc() 222 + if rec.Embed != nil && rec.Embed.EmbedRecord != nil && rec.Embed.EmbedRecord.Record != nil { 223 + quoteRepostsProcessedCounter.Inc() 224 + } 225 + // Parse time from the event time string 226 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 227 + if err != nil { 228 + log.Errorf("error parsing time: %+v", err) 229 + continue 230 + } 231 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 232 + case *bsky.FeedLike: 233 + recordsProcessedCounter.WithLabelValues("feed_like").Inc() 234 + // Parse time from the event time string 235 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 236 + if err != nil { 237 + log.Errorf("error parsing time: %+v", err) 238 + continue 239 + } 240 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 241 + case *bsky.FeedRepost: 242 + recordsProcessedCounter.WithLabelValues("feed_repost").Inc() 243 + // Parse time from the event time string 244 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 245 + if err != nil { 246 + log.Errorf("error parsing time: %+v", err) 247 + continue 248 + } 249 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 250 + case *bsky.GraphBlock: 251 + recordsProcessedCounter.WithLabelValues("graph_block").Inc() 252 + // Parse time from the event time string 253 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 254 + if err != nil { 255 + log.Errorf("error parsing time: %+v", err) 256 + continue 257 + } 258 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 259 + case *bsky.GraphFollow: 260 + recordsProcessedCounter.WithLabelValues("graph_follow").Inc() 261 + // Parse time from the event time string 262 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 263 + if err != nil { 264 + log.Errorf("error parsing time: %+v", err) 265 + continue 266 + } 267 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 268 + case *bsky.ActorProfile: 269 + recordsProcessedCounter.WithLabelValues("actor_profile").Inc() 270 + // Parse time from the event time string 271 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 272 + if err != nil { 273 + log.Errorf("error parsing time: %+v", err) 274 + continue 275 + } 276 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 277 + case *bsky.FeedGenerator: 278 + recordsProcessedCounter.WithLabelValues("feed_generator").Inc() 279 + // Parse time from the event time string 280 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 281 + if err != nil { 282 + log.Errorf("error parsing time: %+v", err) 283 + continue 284 + } 285 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 286 + case *bsky.GraphList: 287 + recordsProcessedCounter.WithLabelValues("graph_list").Inc() 288 + // Parse time from the event time string 289 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 290 + if err != nil { 291 + log.Errorf("error parsing time: %+v", err) 292 + continue 293 + } 294 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 295 + case *bsky.GraphListitem: 296 + recordsProcessedCounter.WithLabelValues("graph_listitem").Inc() 297 + // Parse time from the event time string 298 + recCreatedAt, err := time.Parse(util.ISO8601, evt.Time) 299 + if err != nil { 300 + log.Errorf("error parsing time: %+v", err) 301 + continue 302 + } 303 + lastSeqCreatedAtGauge.Set(float64(recCreatedAt.UnixNano())) 304 + default: 305 + log.Warnf("unknown record type: %+v", rec) 306 + } 307 + 308 + case repomgr.EvtKindDeleteRecord: 309 + default: 310 + log.Warnf("unknown event kind from op action: %+v", op.Action) 311 + } 312 + } 313 + 314 + eventProcessingDurationHistogram.Observe(time.Since(start).Seconds()) 315 + return nil 316 + }