this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

hepa: initial skelton of daemon

+766 -9
+8
automod/doc.go
··· 1 + // Auto-Moderation rules engine for anti-spam and other moderation tasks. 2 + // 3 + // The code in this package includes an "engine" which processes atproto commit events (and identity updates), maintains caches and counters, and pushes moderation decisions to an external mod service (eg, appview). A framework for writing new "rules" for the engine to execute are also provided. 4 + // 5 + // It does not provide label API endpoints like queryLabels; see labelmaker for a self-contained labeling service. 6 + // 7 + // Code for subscribing to a firehose is not included here; see cmd/hepa for a complete service built on this library. 8 + package automod
+99 -3
automod/engine.go
··· 1 1 package automod 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 6 + "fmt" 5 7 "log/slog" 8 + "strings" 6 9 "sync" 7 10 11 + comatproto "github.com/bluesky-social/indigo/api/atproto" 8 12 "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + lexutil "github.com/bluesky-social/indigo/lex/util" 15 + "github.com/bluesky-social/indigo/repo" 16 + "github.com/bluesky-social/indigo/repomgr" 9 17 "github.com/bluesky-social/indigo/xrpc" 10 18 ) 11 19 ··· 19 27 CountStore CountStore 20 28 } 21 29 22 - func (e *Engine) ExecuteIdentity() error { 30 + func (e *Engine) ProcessIdentityEvent(t string, did syntax.DID) error { 23 31 ctx := context.Background() 24 32 25 33 // similar to an HTTP server, we want to recover any panics from rule execution ··· 30 38 // TODO: circuit-break on repeated panics? 31 39 } 32 40 }() 41 + 42 + ident, err := e.Directory.LookupDID(ctx, did) 43 + if err != nil { 44 + return fmt.Errorf("resolving identity: %w", err) 45 + } 46 + if ident == nil { 47 + return fmt.Errorf("identity not found for did: %s", did.String()) 48 + } 49 + 50 + evt := IdentityEvent{ 51 + Event{ 52 + Engine: e, 53 + Account: AccountMeta{Identity: ident}, 54 + }, 55 + } 56 + e.CallIdentityRules(&evt) 33 57 34 58 _ = ctx 35 59 return nil 36 60 } 37 61 38 - func (e *Engine) ExecuteCommit() error { 39 - ctx := context.Background() 62 + // this method takes a full firehose commit event. it must not be a tooBig 63 + func (e *Engine) ProcessCommit(ctx context.Context, commit *comatproto.SyncSubscribeRepos_Commit) error { 40 64 41 65 // similar to an HTTP server, we want to recover any panics from rule execution 42 66 defer func() { ··· 47 71 } 48 72 }() 49 73 74 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(commit.Blocks)) 75 + if err != nil { 76 + // TODO: handle this case (instead of return nil) 77 + slog.Error("reading repo from car", "size_bytes", len(commit.Blocks), "err", err) 78 + return nil 79 + } 80 + 81 + did, err := syntax.ParseDID(commit.Repo) 82 + if err != nil { 83 + return fmt.Errorf("bad DID syntax in event: %w", err) 84 + } 85 + 86 + ident, err := e.Directory.LookupDID(ctx, did) 87 + if err != nil { 88 + return fmt.Errorf("resolving identity: %w", err) 89 + } 90 + if ident == nil { 91 + return fmt.Errorf("identity not found for did: %s", did.String()) 92 + } 93 + 94 + for _, op := range commit.Ops { 95 + ek := repomgr.EventKind(op.Action) 96 + logOp := slog.With("op_path", op.Path, "op_cid", op.Cid) 97 + switch ek { 98 + case repomgr.EvtKindCreateRecord: 99 + rc, rec, err := r.GetRecord(ctx, op.Path) 100 + if err != nil { 101 + // TODO: handle this case (instead of return nil) 102 + logOp.Error("fetching record from event CAR slice", "err", err) 103 + return nil 104 + } 105 + if lexutil.LexLink(rc) != *op.Cid { 106 + // TODO: handle this case (instead of return nil) 107 + logOp.Error("mismatch in record and op cid", "record_cid", rc) 108 + return nil 109 + } 110 + 111 + if strings.HasPrefix(op.Path, "app.bsky.feed.post/") { 112 + // TODO: handle as a PostEvent specially 113 + } else { 114 + // XXX: pass record in to event 115 + _ = rec 116 + evt := RecordEvent{ 117 + Event{ 118 + Engine: e, 119 + Account: AccountMeta{Identity: ident}, 120 + }, 121 + []string{}, 122 + false, 123 + []ModReport{}, 124 + []string{}, 125 + } 126 + e.CallRecordRules(&evt) 127 + // TODO persist 128 + } 129 + case repomgr.EvtKindUpdateRecord: 130 + slog.Info("ignoring record update", "did", commit.Repo, "seq", commit.Seq, "path", op.Path) 131 + return nil 132 + case repomgr.EvtKindDeleteRecord: 133 + slog.Info("ignoring record deletion", "did", commit.Repo, "seq", commit.Seq, "path", op.Path) 134 + return nil 135 + } 136 + } 137 + 50 138 _ = ctx 139 + return nil 140 + } 141 + 142 + func (e *Engine) CallIdentityRules(evt *IdentityEvent) error { 143 + return nil 144 + } 145 + 146 + func (e *Engine) CallRecordRules(evt *RecordEvent) error { 51 147 return nil 52 148 } 53 149
+7 -6
automod/event.go
··· 11 11 12 12 // information about a repo/account/identity, always pre-populated and relevant to many rules 13 13 type AccountMeta struct { 14 - Identity identity.Identity 14 + Identity *identity.Identity 15 15 // TODO: createdAt / age 16 16 } 17 17 18 18 // base type for events. events are both containers for data about the event itself (similar to an HTTP request type); aggregate results and state (counters, mod actions) to be persisted after all rules are run; and act as an API for additional network reads and operations. 19 19 type Event struct { 20 - engine Engine 20 + Engine *Engine 21 21 Err *error 22 22 Account AccountMeta 23 23 CounterIncrements []string ··· 28 28 } 29 29 30 30 func (e *Event) CountTotal(key string) int { 31 - v, err := e.engine.GetCount(key, PeriodTotal) 31 + v, err := e.Engine.GetCount(key, PeriodTotal) 32 32 if err != nil { 33 33 e.Err = &err 34 34 return 0 ··· 37 37 } 38 38 39 39 func (e *Event) CountDay(key string) int { 40 - v, err := e.engine.GetCount(key, PeriodDay) 40 + v, err := e.Engine.GetCount(key, PeriodDay) 41 41 if err != nil { 42 42 e.Err = &err 43 43 return 0 ··· 46 46 } 47 47 48 48 func (e *Event) CountHour(key string) int { 49 - v, err := e.engine.GetCount(key, PeriodHour) 49 + v, err := e.Engine.GetCount(key, PeriodHour) 50 50 if err != nil { 51 51 e.Err = &err 52 52 return 0 ··· 55 55 } 56 56 57 57 func (e *Event) InSet(name, val string) bool { 58 - v, err := e.engine.InSet(name, val) 58 + v, err := e.Engine.InSet(name, val) 59 59 if err != nil { 60 60 e.Err = &err 61 61 return false ··· 89 89 90 90 type RecordEvent struct { 91 91 Event 92 + 92 93 RecordLabels []string 93 94 RecordTakedown bool 94 95 RecordReports []ModReport
+7
cmd/hepa/README.md
··· 1 + 2 + HEPA 3 + ==== 4 + 5 + This is a simple auto-moderation daemon which wraps the automod package. 6 + 7 + The name is a reference to HEPA air filters, which help keep the local atmosphere clean and healthy for humans.
+287
cmd/hepa/firehose.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "net/http" 8 + "net/url" 9 + "strings" 10 + "time" 11 + 12 + comatproto "github.com/bluesky-social/indigo/api/atproto" 13 + //bsky "github.com/bluesky-social/indigo/api/bsky" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/events" 16 + "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 17 + "github.com/bluesky-social/indigo/repo" 18 + 19 + "github.com/carlmjohnson/versioninfo" 20 + "github.com/gorilla/websocket" 21 + "github.com/ipfs/go-cid" 22 + typegen "github.com/whyrusleeping/cbor-gen" 23 + ) 24 + 25 + func (s *Server) getLastCursor() (int64, error) { 26 + var lastSeq LastSeq 27 + if err := s.db.Find(&lastSeq).Error; err != nil { 28 + return 0, err 29 + } 30 + 31 + if lastSeq.ID == 0 { 32 + return 0, s.db.Create(&lastSeq).Error 33 + } 34 + 35 + return lastSeq.Seq, nil 36 + } 37 + 38 + func (s *Server) updateLastCursor(curs int64) error { 39 + return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error 40 + } 41 + 42 + func (s *Server) Run(ctx context.Context) error { 43 + cur, err := s.getLastCursor() 44 + if err != nil { 45 + return fmt.Errorf("get last cursor: %w", err) 46 + } 47 + 48 + err = s.bfs.LoadJobs(ctx) 49 + if err != nil { 50 + return fmt.Errorf("loading backfill jobs: %w", err) 51 + } 52 + go s.bf.Start() 53 + go s.discoverRepos() 54 + 55 + d := websocket.DefaultDialer 56 + u, err := url.Parse(s.bgshost) 57 + if err != nil { 58 + return fmt.Errorf("invalid bgshost URI: %w", err) 59 + } 60 + u.Path = "xrpc/com.atproto.sync.subscribeRepos" 61 + if cur != 0 { 62 + u.RawQuery = fmt.Sprintf("cursor=%d", cur) 63 + } 64 + con, _, err := d.Dial(u.String(), http.Header{ 65 + "User-Agent": []string{fmt.Sprintf("palomar/%s", versioninfo.Short())}, 66 + }) 67 + if err != nil { 68 + return fmt.Errorf("events dial failed: %w", err) 69 + } 70 + 71 + rsc := &events.RepoStreamCallbacks{ 72 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 73 + ctx := context.Background() 74 + ctx, span := tracer.Start(ctx, "RepoCommit") 75 + defer span.End() 76 + 77 + defer func() { 78 + if evt.Seq%50 == 0 { 79 + if err := s.updateLastCursor(evt.Seq); err != nil { 80 + s.logger.Error("failed to persist cursor", "err", err) 81 + } 82 + } 83 + }() 84 + logEvt := s.logger.With("repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 85 + if evt.TooBig && evt.Prev != nil { 86 + // TODO: handle this case (instead of return nil) 87 + logEvt.Error("skipping non-genesis tooBig events for now") 88 + return nil 89 + } 90 + 91 + if evt.TooBig { 92 + if err := s.processTooBigCommit(ctx, evt); err != nil { 93 + // TODO: handle this case (instead of return nil) 94 + logEvt.Error("failed to process tooBig event", "err", err) 95 + return nil 96 + } 97 + 98 + return nil 99 + } 100 + 101 + if !s.skipBackfill { 102 + // Check if we've backfilled this repo, if not, we should enqueue it 103 + job, err := s.bfs.GetJob(ctx, evt.Repo) 104 + if job == nil && err == nil { 105 + logEvt.Info("enqueueing backfill job for new repo") 106 + if err := s.bfs.EnqueueJob(evt.Repo); err != nil { 107 + logEvt.Warn("failed to enqueue backfill job", "err", err) 108 + } 109 + } 110 + } 111 + 112 + if err = s.engine.ProcessCommit(ctx, evt); err != nil { 113 + // TODO: handle this, instead of return nul 114 + logEvt.Error("failed to process commit", "err", err) 115 + return nil 116 + } 117 + 118 + return nil 119 + 120 + }, 121 + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 122 + ctx := context.Background() 123 + ctx, span := tracer.Start(ctx, "RepoHandle") 124 + defer span.End() 125 + 126 + did, err := syntax.ParseDID(evt.Did) 127 + if err != nil { 128 + s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 129 + return nil 130 + } 131 + if err := s.engine.ProcessIdentityEvent("handle", did); err != nil { 132 + s.logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 133 + } 134 + return nil 135 + }, 136 + } 137 + 138 + return events.HandleRepoStream( 139 + ctx, con, autoscaling.NewScheduler( 140 + autoscaling.DefaultAutoscaleSettings(), 141 + s.bgshost, 142 + rsc.EventHandler, 143 + ), 144 + ) 145 + } 146 + 147 + func (s *Server) discoverRepos() { 148 + ctx := context.Background() 149 + log := s.logger.With("func", "discoverRepos") 150 + log.Info("starting repo discovery") 151 + 152 + cursor := "" 153 + limit := int64(500) 154 + 155 + totalEnqueued := 0 156 + totalSkipped := 0 157 + totalErrored := 0 158 + 159 + for { 160 + resp, err := comatproto.SyncListRepos(ctx, s.bgsxrpc, cursor, limit) 161 + if err != nil { 162 + log.Error("failed to list repos", "err", err) 163 + time.Sleep(5 * time.Second) 164 + continue 165 + } 166 + log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor) 167 + enqueued := 0 168 + skipped := 0 169 + errored := 0 170 + for _, repo := range resp.Repos { 171 + job, err := s.bfs.GetJob(ctx, repo.Did) 172 + if job == nil && err == nil { 173 + log.Info("enqueuing backfill job for new repo", "did", repo.Did) 174 + if err := s.bfs.EnqueueJob(repo.Did); err != nil { 175 + log.Warn("failed to enqueue backfill job", "err", err) 176 + errored++ 177 + continue 178 + } 179 + enqueued++ 180 + } else if err != nil { 181 + log.Warn("failed to get backfill job", "did", repo.Did, "err", err) 182 + errored++ 183 + } else { 184 + skipped++ 185 + } 186 + } 187 + log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored) 188 + totalEnqueued += enqueued 189 + totalSkipped += skipped 190 + totalErrored += errored 191 + if resp.Cursor != nil && *resp.Cursor != "" { 192 + cursor = *resp.Cursor 193 + } else { 194 + break 195 + } 196 + } 197 + 198 + log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored) 199 + } 200 + 201 + func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error { 202 + // Since this gets called in a backfill job, we need to check if the path is a post or profile 203 + if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") { 204 + return nil 205 + } 206 + 207 + did, err := syntax.ParseDID(rawDID) 208 + if err != nil { 209 + return fmt.Errorf("bad DID syntax in event: %w", err) 210 + } 211 + 212 + ident, err := s.dir.LookupDID(ctx, did) 213 + if err != nil { 214 + return fmt.Errorf("resolving identity: %w", err) 215 + } 216 + if ident == nil { 217 + return fmt.Errorf("identity not found for did: %s", did.String()) 218 + } 219 + rec := *recP 220 + 221 + _ = rec 222 + /* XXX: 223 + switch rec := rec.(type) { 224 + case *bsky.FeedPost: 225 + // XXX: if err := s.indexPost(ctx, ident, rec, path, *rcid); err != nil { 226 + _ = rec 227 + if err := s.engine.ProcessCommit(ctx, evt); err != nil { 228 + postsFailed.Inc() 229 + return fmt.Errorf("processing post for %s: %w", did.String(), err) 230 + } 231 + postsIndexed.Inc() 232 + case *bsky.ActorProfile: 233 + // XXX: if err := s.indexProfile(ctx, ident, rec, path, *rcid); err != nil { 234 + if err := s.engine.ProcessCommit(ctx, evt); err != nil { 235 + profilesFailed.Inc() 236 + return fmt.Errorf("processing profile for %s: %w", did.String(), err) 237 + } 238 + profilesIndexed.Inc() 239 + default: 240 + } 241 + */ 242 + return nil 243 + } 244 + 245 + func (s *Server) handleDelete(ctx context.Context, rawDID, path string) error { 246 + // TODO: just ignoring for now 247 + return nil 248 + } 249 + 250 + func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 251 + repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "") 252 + if err != nil { 253 + return err 254 + } 255 + 256 + r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata)) 257 + if err != nil { 258 + return err 259 + } 260 + 261 + did, err := syntax.ParseDID(evt.Repo) 262 + if err != nil { 263 + return fmt.Errorf("bad DID in repo event: %w", err) 264 + } 265 + 266 + return r.ForEach(ctx, "", func(k string, v cid.Cid) error { 267 + if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") { 268 + rcid, rec, err := r.GetRecord(ctx, k) 269 + if err != nil { 270 + // TODO: handle this case (instead of return nil) 271 + s.logger.Error("failed to get record from repo checkout", "path", k, "err", err) 272 + return nil 273 + } 274 + 275 + // TODO: may want to treat this as a regular event? 276 + _ = rcid 277 + _ = did 278 + _ = rec 279 + /* XXX: 280 + if err := s.engine.ProcessRecord(ctx, did, m, rec); err != nil { 281 + return fmt.Errorf("processing record from tooBig commit: %w", err) 282 + } 283 + */ 284 + } 285 + return nil 286 + }) 287 + }
+193
cmd/hepa/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log" 7 + "log/slog" 8 + "net/http" 9 + "os" 10 + "time" 11 + 12 + "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/util/cliutil" 14 + 15 + "github.com/carlmjohnson/versioninfo" 16 + _ "github.com/joho/godotenv/autoload" 17 + cli "github.com/urfave/cli/v2" 18 + "go.opentelemetry.io/otel" 19 + "go.opentelemetry.io/otel/attribute" 20 + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" 21 + "go.opentelemetry.io/otel/sdk/resource" 22 + tracesdk "go.opentelemetry.io/otel/sdk/trace" 23 + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" 24 + "golang.org/x/time/rate" 25 + ) 26 + 27 + func main() { 28 + if err := run(os.Args); err != nil { 29 + slog.Error("exiting", "err", err) 30 + os.Exit(-1) 31 + } 32 + } 33 + 34 + func run(args []string) error { 35 + 36 + app := cli.App{ 37 + Name: "hepa", 38 + Usage: "automod daemon (cleans the atmosphere)", 39 + Version: versioninfo.Short(), 40 + } 41 + 42 + app.Flags = []cli.Flag{ 43 + &cli.StringFlag{ 44 + Name: "atp-bgs-host", 45 + Usage: "hostname and port of BGS to subscribe to", 46 + Value: "wss://bsky.social", 47 + EnvVars: []string{"ATP_BGS_HOST"}, 48 + }, 49 + &cli.StringFlag{ 50 + Name: "atp-plc-host", 51 + Usage: "method, hostname, and port of PLC registry", 52 + Value: "https://plc.directory", 53 + EnvVars: []string{"ATP_PLC_HOST"}, 54 + }, 55 + &cli.IntFlag{ 56 + Name: "max-metadb-connections", 57 + EnvVars: []string{"MAX_METADB_CONNECTIONS"}, 58 + Value: 40, 59 + }, 60 + } 61 + 62 + app.Commands = []*cli.Command{ 63 + runCmd, 64 + } 65 + 66 + return app.Run(args) 67 + } 68 + 69 + var runCmd = &cli.Command{ 70 + Name: "run", 71 + Usage: "run the service", 72 + Flags: []cli.Flag{ 73 + &cli.StringFlag{ 74 + Name: "database-url", 75 + Value: "sqlite://data/hepa/automod.db", 76 + EnvVars: []string{"DATABASE_URL"}, 77 + }, 78 + &cli.BoolFlag{ 79 + Name: "readonly", 80 + EnvVars: []string{"HEPA_READONLY", "READONLY"}, 81 + }, 82 + &cli.StringFlag{ 83 + Name: "bind", 84 + Usage: "IP or address, and port, to listen on for HTTP APIs", 85 + Value: ":3999", 86 + EnvVars: []string{"HEPA_BIND"}, 87 + }, 88 + &cli.StringFlag{ 89 + Name: "metrics-listen", 90 + Usage: "IP or address, and port, to listen on for metrics APIs", 91 + Value: ":3998", 92 + EnvVars: []string{"HEPA_METRICS_LISTEN"}, 93 + }, 94 + &cli.IntFlag{ 95 + Name: "bgs-sync-rate-limit", 96 + Usage: "max repo sync (checkout) requests per second to upstream (BGS)", 97 + Value: 8, 98 + EnvVars: []string{"HEPA_BGS_SYNC_RATE_LIMIT"}, 99 + }, 100 + &cli.IntFlag{ 101 + Name: "plc-rate-limit", 102 + Usage: "max number of requests per second to PLC registry", 103 + Value: 100, 104 + EnvVars: []string{"HEPA_PLC_RATE_LIMIT"}, 105 + }, 106 + }, 107 + Action: func(cctx *cli.Context) error { 108 + ctx := context.Background() 109 + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 110 + Level: slog.LevelInfo, 111 + })) 112 + slog.SetDefault(logger) 113 + 114 + // Enable OTLP HTTP exporter 115 + // For relevant environment variables: 116 + // https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables 117 + // At a minimum, you need to set 118 + // OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 119 + if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" { 120 + slog.Info("setting up trace exporter", "endpoint", ep) 121 + ctx, cancel := context.WithCancel(context.Background()) 122 + defer cancel() 123 + 124 + exp, err := otlptracehttp.New(ctx) 125 + if err != nil { 126 + log.Fatal("failed to create trace exporter", "error", err) 127 + } 128 + defer func() { 129 + ctx, cancel := context.WithTimeout(context.Background(), time.Second) 130 + defer cancel() 131 + if err := exp.Shutdown(ctx); err != nil { 132 + slog.Error("failed to shutdown trace exporter", "error", err) 133 + } 134 + }() 135 + 136 + tp := tracesdk.NewTracerProvider( 137 + tracesdk.WithBatcher(exp), 138 + tracesdk.WithResource(resource.NewWithAttributes( 139 + semconv.SchemaURL, 140 + semconv.ServiceNameKey.String("hepa"), 141 + attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog 142 + attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others 143 + attribute.Int64("ID", 1), 144 + )), 145 + ) 146 + otel.SetTracerProvider(tp) 147 + } 148 + 149 + db, err := cliutil.SetupDatabase(cctx.String("database-url"), cctx.Int("max-metadb-connections")) 150 + if err != nil { 151 + return err 152 + } 153 + 154 + // TODO: replace this with "bingo" resolver? 155 + base := identity.BaseDirectory{ 156 + PLCURL: cctx.String("atp-plc-host"), 157 + HTTPClient: http.Client{ 158 + Timeout: time.Second * 15, 159 + }, 160 + PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1), 161 + TryAuthoritativeDNS: true, 162 + SkipDNSDomainSuffixes: []string{".bsky.social"}, 163 + } 164 + dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2) 165 + 166 + srv, err := NewServer( 167 + db, 168 + &dir, 169 + Config{ 170 + BGSHost: cctx.String("atp-bgs-host"), 171 + Logger: logger, 172 + BGSSyncRateLimit: cctx.Int("bgs-sync-rate-limit"), 173 + }, 174 + ) 175 + if err != nil { 176 + return err 177 + } 178 + 179 + go func() { 180 + if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil { 181 + slog.Error("failed to start metrics endpoint", "error", err) 182 + panic(fmt.Errorf("failed to start metrics endpoint: %w", err)) 183 + } 184 + }() 185 + 186 + // TODO: if cctx.Bool("readonly") ... 187 + 188 + if err := srv.Run(ctx); err != nil { 189 + return fmt.Errorf("failed to run automod service: %w", err) 190 + } 191 + return nil 192 + }, 193 + }
+54
cmd/hepa/metrics.go
··· 1 + package main 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + "go.opentelemetry.io/otel" 7 + ) 8 + 9 + var tracer = otel.Tracer("hepa") 10 + 11 + var postsReceived = promauto.NewCounter(prometheus.CounterOpts{ 12 + Name: "hepa_posts_received", 13 + Help: "Number of posts received", 14 + }) 15 + 16 + var postsIndexed = promauto.NewCounter(prometheus.CounterOpts{ 17 + Name: "hepa_posts_indexed", 18 + Help: "Number of posts indexed", 19 + }) 20 + 21 + var postsFailed = promauto.NewCounter(prometheus.CounterOpts{ 22 + Name: "hepa_posts_failed", 23 + Help: "Number of posts that failed indexing", 24 + }) 25 + 26 + var postsDeleted = promauto.NewCounter(prometheus.CounterOpts{ 27 + Name: "hepa_posts_deleted", 28 + Help: "Number of posts deleted", 29 + }) 30 + 31 + var profilesReceived = promauto.NewCounter(prometheus.CounterOpts{ 32 + Name: "hepa_profiles_received", 33 + Help: "Number of profiles received", 34 + }) 35 + 36 + var profilesIndexed = promauto.NewCounter(prometheus.CounterOpts{ 37 + Name: "hepa_profiles_indexed", 38 + Help: "Number of profiles indexed", 39 + }) 40 + 41 + var profilesFailed = promauto.NewCounter(prometheus.CounterOpts{ 42 + Name: "hepa_profiles_failed", 43 + Help: "Number of profiles that failed indexing", 44 + }) 45 + 46 + var profilesDeleted = promauto.NewCounter(prometheus.CounterOpts{ 47 + Name: "hepa_profiles_deleted", 48 + Help: "Number of profiles deleted", 49 + }) 50 + 51 + var currentSeq = promauto.NewGauge(prometheus.GaugeOpts{ 52 + Name: "hepa_current_seq", 53 + Help: "Current sequence number", 54 + })
+111
cmd/hepa/server.go
··· 1 + package main 2 + 3 + import ( 4 + "fmt" 5 + "log/slog" 6 + "net/http" 7 + "os" 8 + "strings" 9 + 10 + "github.com/bluesky-social/indigo/atproto/identity" 11 + "github.com/bluesky-social/indigo/automod" 12 + "github.com/bluesky-social/indigo/backfill" 13 + "github.com/bluesky-social/indigo/xrpc" 14 + 15 + "github.com/prometheus/client_golang/prometheus/promhttp" 16 + gorm "gorm.io/gorm" 17 + ) 18 + 19 + type Server struct { 20 + db *gorm.DB 21 + bgshost string 22 + bgsxrpc *xrpc.Client 23 + dir identity.Directory 24 + logger *slog.Logger 25 + engine *automod.Engine 26 + skipBackfill bool 27 + 28 + bfs *backfill.Gormstore 29 + bf *backfill.Backfiller 30 + } 31 + 32 + type LastSeq struct { 33 + ID uint `gorm:"primarykey"` 34 + Seq int64 35 + } 36 + 37 + type Config struct { 38 + BGSHost string 39 + Logger *slog.Logger 40 + BGSSyncRateLimit int 41 + MaxEventConcurrency int 42 + } 43 + 44 + func NewServer(db *gorm.DB, dir identity.Directory, config Config) (*Server, error) { 45 + logger := config.Logger 46 + if logger == nil { 47 + logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 48 + Level: slog.LevelInfo, 49 + })) 50 + } 51 + 52 + logger.Info("running database migrations") 53 + db.AutoMigrate(&LastSeq{}) 54 + db.AutoMigrate(&backfill.GormDBJob{}) 55 + 56 + bgsws := config.BGSHost 57 + if !strings.HasPrefix(bgsws, "ws") { 58 + return nil, fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'") 59 + } 60 + 61 + bgshttp := strings.Replace(bgsws, "ws", "http", 1) 62 + bgsxrpc := &xrpc.Client{ 63 + Host: bgshttp, 64 + } 65 + 66 + engine := automod.Engine{} 67 + 68 + s := &Server{ 69 + db: db, 70 + bgshost: config.BGSHost, // NOTE: the original URL, not 'bgshttp' 71 + bgsxrpc: bgsxrpc, 72 + dir: dir, 73 + logger: logger, 74 + engine: &engine, 75 + skipBackfill: true, 76 + } 77 + 78 + bfstore := backfill.NewGormstore(db) 79 + opts := backfill.DefaultBackfillOptions() 80 + if config.BGSSyncRateLimit > 0 { 81 + opts.SyncRequestsPerSecond = config.BGSSyncRateLimit 82 + opts.ParallelBackfills = 2 * config.BGSSyncRateLimit 83 + } else { 84 + opts.SyncRequestsPerSecond = 8 85 + } 86 + opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", bgshttp) 87 + if config.MaxEventConcurrency > 0 { 88 + opts.ParallelRecordCreates = config.MaxEventConcurrency 89 + } else { 90 + opts.ParallelRecordCreates = 20 91 + } 92 + opts.NSIDFilter = "app.bsky." 93 + bf := backfill.NewBackfiller( 94 + "hepa", 95 + bfstore, 96 + s.handleCreateOrUpdate, 97 + s.handleCreateOrUpdate, 98 + s.handleDelete, 99 + opts, 100 + ) 101 + 102 + s.bfs = bfstore 103 + s.bf = bf 104 + 105 + return s, nil 106 + } 107 + 108 + func (s *Server) RunMetrics(listen string) error { 109 + http.Handle("/metrics", promhttp.Handler()) 110 + return http.ListenAndServe(listen, nil) 111 + }