···1122-atproto Relay Service
33-===============================
22+`relay`: atproto relay reference implementation
33+===============================================
4455*NOTE: "relays" used to be called "Big Graph Servers", or "BGS", or "bigsky". Many variables and packages still reference "bgs"*
6677-This is the implementation of an atproto relay which is running in the production network, written and operated by Bluesky.
77+This is a reference implementation of an atproto relay, written and operated by Bluesky.
8899In atproto, a relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas.
1010···35353636You can re-build and run the command directly to get a list of configuration flags and env vars; env vars will be loaded from `.env` if that file exists:
37373838- RELAY_ADMIN_KEY=localdev go run ./cmd/relay/ --help
3838+ RELAY_ADMIN_PASSWORD=dummy go run ./cmd/relay/ --help
39394040By default, the daemon will use sqlite for databases (in the directory `./data/relay/`) and the HTTP API will be bound to localhost port 2470.
4141···5050 # careful! double-check this destructive command
5151 rm -rf ./data/relay/*
52525353-There is a basic web dashboard, though it will not be included unless built and copied to a local directory `./public/`. Run `make build-relay-ui`, and then when running the daemon the dashboard will be available at: <http://localhost:2470/dash/>. Paste in the admin key, eg `localdev`.
5353+There is a basic web dashboard, though it will not be included unless built and copied to a local directory `./public/`. Run `make build-relay-ui`, and then when running the daemon the dashboard will be available at: <http://localhost:2470/dash/>. Paste in the admin key, eg `dummy`.
54545555-The local admin routes can also be accessed by passing the admin key as a bearer token, for example:
5555+The local admin routes can also be accessed by passing the admin password using HTTP Basic auth (with username `admin`), for example:
56565757- http get :2470/admin/pds/list Authorization:"Bearer localdev"
5757+ http get :2470/admin/pds/list -a admin:dummy
58585959Request crawl of an individual PDS instance like:
60606161- http post :2470/admin/pds/requestCrawl Authorization:"Bearer localdev" hostname=pds.example.com
6161+ http post :2470/admin/pds/requestCrawl -a admin:dummy hostname=pds.example.com
626263636464## Docker Containers
···93939494Some notable configuration env vars to set:
95959696-- `ENVIRONMENT`: eg, `production`
9797-- `DATABASE_URL`: see section below
9898-- `GOLOG_LOG_LEVEL`: log verbosity
9696+- `RELAY_ADMIN_PASSWORD`
9797+- `DATABASE_URL`: eg, `postgres://relay:CHANGEME@localhost:5432/relay`
9898+- `RELAY_PERSIST_DIR`: storage location for "backfill" events, eg `/data/relay/persist`
9999+- `RELAY_REPLAY_WINDOW`: the duration of output "backfill window", eg `24h`
100100+- `RELAY_LENIENT_SYNC_VALIDATION`: if `true`, allow legacy upstreams which don't implement atproto sync v1.1
101101+- `RELAY_TRUSTED_DOMAINS`: patterns of PDS hosts which get larger quotas by default, eg `*.host.bsky.network`
99102100100-There is a health check endpoint at `/xrpc/_health`. Prometheus metrics are exposed by default on port 2471, path `/metrics`. The service logs fairly verbosely to stderr; use `GOLOG_LOG_LEVEL` to control log volume.
103103+There is a health check endpoint at `/xrpc/_health`. Prometheus metrics are exposed by default on port 2471, path `/metrics`. The service logs fairly verbosely to stdout; use `LOG_LEVEL` to control log volume (`warn`, `info`, etc).
101104102105Be sure to double-check bandwidth usage and pricing if running a public relay! Bandwidth prices can vary widely between providers, and popular cloud services (AWS, Google Cloud, Azure) are very expensive compared to alternatives like OVH or Hetzner.
103103-104104-105105-## Bootstrapping the Network
106106-107107-To bootstrap the entire network, you'll want to start with a list of large PDS instances to backfill from. You could pull from a public dashboard of instances (like [mackuba's](https://blue.mackuba.eu/directory/pdses)), or scrape the full DID PLC directory, parse out all PDS service declarations, and sort by count.
108108-109109-Once you have a set of PDS hosts, you can put the bare hostnames (not URLs: no `https://` prefix, port, or path suffix) in a `hosts.txt` file, and then use the `crawl_pds.sh` script to backfill and configure limits for all of them:
110110-111111- export RELAY_HOST=your.pds.hostname.tld
112112- export RELAY_ADMIN_KEY=your-secret-key
113113-114114- # both request crawl, and set generous crawl limits for each
115115- cat hosts.txt | parallel -j1 ./crawl_pds.sh {}
116116-117117-Just consuming from the firehose for a few hours will only backfill accounts with activity during that period. This is fine to get the backfill process started, but eventually you'll want to do full "resync" of all the repositories on the PDS host to the most recent repo rev version. To enqueue that for all the PDS instances:
118118-119119- # start sync/backfill of all accounts
120120- cat hosts.txt | parallel -j1 ./sync_pds.sh {}
121121-122122-Lastly, can monitor progress of any ongoing re-syncs:
123123-124124- # check sync progress for all hosts
125125- cat hosts.txt | parallel -j1 ./sync_pds.sh {}
126126-127127-128128-## Admin API
129129-130130-The relay has a number of admin HTTP API endpoints. Given a relay setup listening on port 2470 and with a reasonably secure admin secret:
131131-132132-```
133133-RELAY_ADMIN_PASSWORD=$(openssl rand --hex 16)
134134-relay --api-listen :2470 --admin-key ${RELAY_ADMIN_PASSWORD} ...
135135-```
136136-137137-One can, for example, begin compaction of all repos
138138-139139-```
140140-curl -H 'Authorization: Bearer '${RELAY_ADMIN_PASSWORD} -H 'Content-Type: application/x-www-form-urlencoded' --data '' http://127.0.0.1:2470/admin/repo/compactAll
141141-```
142142-143143-### /admin/subs/getUpstreamConns
144144-145145-Return list of PDS host names in json array of strings: ["host", ...]
146146-147147-### /admin/subs/perDayLimit
148148-149149-Return `{"limit": int}` for the number of new PDS subscriptions that the relay may start in a rolling 24 hour window.
150150-151151-### /admin/subs/setPerDayLimit
152152-153153-POST with `?limit={int}` to set the number of new PDS subscriptions that the relay may start in a rolling 24 hour window.
154154-155155-### /admin/subs/setEnabled
156156-157157-POST with param `?enabled=true` or `?enabled=false` to enable or disable PDS-requested new-PDS crawling.
158158-159159-### /admin/subs/getEnabled
160160-161161-Return `{"enabled": bool}` if non-admin new PDS crawl requests are enabled
162162-163163-### /admin/subs/killUpstream
164164-165165-POST with `?host={pds host name}` to disconnect from their firehose.
166166-167167-Optionally add `&block=true` to prevent connecting to them in the future.
168168-169169-### /admin/subs/listDomainBans
170170-171171-Return `{"banned_domains": ["host name", ...]}`
172172-173173-### /admin/subs/banDomain
174174-175175-POST `{"Domain": "host name"}` to ban a domain
176176-177177-### /admin/subs/unbanDomain
178178-179179-POST `{"Domain": "host name"}` to un-ban a domain
180180-181181-### /admin/repo/takeDown
182182-183183-POST `{"did": "did:..."}` to take-down a bad repo; deletes all local data for the repo
184184-185185-### /admin/repo/reverseTakedown
186186-187187-POST `?did={did:...}` to reverse a repo take-down
188188-189189-### /admin/pds/requestCrawl
190190-191191-POST `{"hostname":"pds host"}` to start crawling a PDS
192192-193193-### /admin/pds/list
194194-195195-GET returns JSON list of records
196196-```json
197197-[{
198198- "Host": string,
199199- "Did": string,
200200- "SSL": bool,
201201- "Cursor": int,
202202- "Registered": bool,
203203- "Blocked": bool,
204204- "RateLimit": float,
205205- "CrawlRateLimit": float,
206206- "RepoCount": int,
207207- "RepoLimit": int,
208208- "HourlyEventLimit": int,
209209- "DailyEventLimit": int,
210210-211211- "HasActiveConnection": bool,
212212- "EventsSeenSinceStartup": int,
213213- "PerSecondEventRate": {"Max": float, "Window": float seconds},
214214- "PerHourEventRate": {"Max": float, "Window": float seconds},
215215- "PerDayEventRate": {"Max": float, "Window": float seconds},
216216- "CrawlRate": {"Max": float, "Window": float seconds},
217217- "UserCount": int,
218218-}, ...]
219219-```
220220-221221-### /admin/pds/changeLimits
222222-223223-POST to set the limits for a PDS. body:
224224-225225-```json
226226-{
227227- "host": string,
228228- "per_second": int,
229229- "per_hour": int,
230230- "per_day": int,
231231- "crawl_rate": int,
232232- "repo_limit": int,
233233-}
234234-```
235235-236236-### /admin/pds/block
237237-238238-POST `?host={host}` to block a PDS
239239-240240-### /admin/pds/unblock
241241-242242-POST `?host={host}` to un-block a PDS
243243-244244-245245-### /admin/pds/addTrustedDomain
246246-247247-POST `?domain={}` to make a domain trusted
248248-249249-### /admin/consumers/list
250250-251251-GET returns list json of clients currently reading from the relay firehose
252252-253253-```json
254254-[{
255255- "id": int,
256256- "remote_addr": string,
257257- "user_agent": string,
258258- "events_consumed": int,
259259- "connected_at": time,
260260-}, ...]
261261-```
···11+package relay
22+33+import (
44+ "context"
55+ "errors"
66+ "fmt"
77+ "log/slog"
88+ "time"
99+1010+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1111+ "github.com/bluesky-social/indigo/atproto/identity"
1212+ "github.com/bluesky-social/indigo/atproto/syntax"
1313+ "github.com/bluesky-social/indigo/cmd/relay/relay/models"
1414+ "github.com/bluesky-social/indigo/cmd/relay/stream"
1515+1616+ "go.opentelemetry.io/otel/attribute"
1717+)
1818+1919+// This callback function gets called by Slurper on every upstream repo stream message from any host.
2020+//
2121+// Messages are processed in-order for a single account on a single host; but may be concurrent or out-of-order for the same account *across* hosts (eg, during account migration or a conflict)
2222+func (r *Relay) processRepoEvent(ctx context.Context, evt *stream.XRPCStreamEvent, hostname string, hostID uint64) error {
2323+ ctx, span := tracer.Start(ctx, "processRepoEvent")
2424+ defer span.End()
2525+2626+ start := time.Now()
2727+ defer func() {
2828+ eventsHandleDuration.WithLabelValues(hostname).Observe(time.Since(start).Seconds())
2929+ }()
3030+3131+ EventsReceivedCounter.WithLabelValues(hostname).Add(1)
3232+3333+ switch {
3434+ case evt.RepoCommit != nil:
3535+ repoCommitsReceivedCounter.WithLabelValues(hostname).Add(1)
3636+ return r.processCommitEvent(ctx, evt.RepoCommit, hostname, hostID)
3737+ case evt.RepoSync != nil:
3838+ repoSyncReceivedCounter.WithLabelValues(hostname).Add(1)
3939+ return r.processSyncEvent(ctx, evt.RepoSync, hostname, hostID)
4040+ case evt.RepoIdentity != nil:
4141+ //repoIdentityReceivedCounter.WithLabelValues(hostname).Add(1)
4242+ return r.processIdentityEvent(ctx, evt.RepoIdentity, hostname, hostID)
4343+ case evt.RepoAccount != nil:
4444+ //repoAccountReceivedCounter.WithLabelValues(hostname).Add(1)
4545+ return r.processAccountEvent(ctx, evt.RepoAccount, hostname, hostID)
4646+ case evt.RepoHandle != nil: // DEPRECATED
4747+ eventsWarningsCounter.WithLabelValues(hostname, "handle").Add(1)
4848+ return nil
4949+ case evt.RepoMigrate != nil: // DEPRECATED
5050+ eventsWarningsCounter.WithLabelValues(hostname, "migrate").Add(1)
5151+ return nil
5252+ case evt.RepoTombstone != nil: // DEPRECATED
5353+ eventsWarningsCounter.WithLabelValues(hostname, "tombstone").Add(1)
5454+ return nil
5555+ default:
5656+ return fmt.Errorf("unhandled repo stream event type")
5757+ }
5858+}
5959+6060+// Implements the shared part of event processing: that the account existing, is associated with this host, etc.
6161+//
6262+// If there is no error, the returned account is always non-nil, but the identity may be nil (if there was a resolution error).
6363+func (r *Relay) preProcessEvent(ctx context.Context, didStr string, hostname string, hostID uint64, logger *slog.Logger) (*models.Account, *identity.Identity, error) {
6464+6565+ did, err := syntax.ParseDID(didStr)
6666+ if err != nil {
6767+ return nil, nil, fmt.Errorf("invalid DID in message: %w", err)
6868+ }
6969+ // TODO: add a test case for non-normalized DID
7070+ did = NormalizeDID(did)
7171+7272+ acc, err := r.GetAccount(ctx, did)
7373+ if err != nil {
7474+ if !errors.Is(err, ErrAccountNotFound) {
7575+ return nil, nil, fmt.Errorf("fetching account: %w", err)
7676+ }
7777+7878+ acc, err = r.CreateAccountHost(ctx, did, hostID, hostname)
7979+ if err != nil {
8080+ return nil, nil, err
8181+ }
8282+ }
8383+8484+ if acc == nil {
8585+ // TODO: this is defensive and could be removed
8686+ return nil, nil, ErrAccountNotFound
8787+ }
8888+8989+ // verify that the account is on the subscribed host (or update if it should be)
9090+ if err := r.EnsureAccountHost(ctx, acc, hostID, hostname); err != nil {
9191+ return nil, nil, err
9292+ }
9393+9494+ // skip identity lookup if account is not active
9595+ if !acc.IsActive() {
9696+ return acc, nil, nil
9797+ }
9898+9999+ ident, err := r.Dir.LookupDID(ctx, did)
100100+ if err != nil {
101101+ logger.Warn("failed to load identity", "did", did, "err", err)
102102+ }
103103+ return acc, ident, nil
104104+}
105105+106106+func (r *Relay) processCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, hostname string, hostID uint64) error {
107107+ logger := r.Logger.With("did", evt.Repo, "seq", evt.Seq, "host", hostname, "eventType", "commit", "rev", evt.Rev)
108108+ logger.Debug("relay got commit event")
109109+110110+ acc, ident, err := r.preProcessEvent(ctx, evt.Repo, hostname, hostID, logger)
111111+ if err != nil {
112112+ return err
113113+ }
114114+115115+ if !acc.IsActive() {
116116+ logger.Info("dropping commit message for non-active account", "status", acc.Status, "upstreamStatus", acc.UpstreamStatus)
117117+ return nil
118118+ }
119119+120120+ if ident == nil {
121121+ // TODO: what to do if identity resolution fails
122122+ }
123123+124124+ prevRepo, err := r.GetAccountRepo(ctx, acc.UID)
125125+ if err != nil && !errors.Is(err, ErrAccountRepoNotFound) {
126126+ // TODO: should this be a hard error?
127127+ logger.Error("failed to read previous repo state", "err", err)
128128+ }
129129+130130+ // fast check for stale revision (will be re-checked in VerifyRepoCommit)
131131+ if prevRepo != nil && prevRepo.Rev != "" && evt.Rev != "" {
132132+ if evt.Rev <= prevRepo.Rev {
133133+ logger.Warn("dropping commit with old rev", "prevRev", prevRepo.Rev)
134134+ return nil
135135+ }
136136+ }
137137+138138+ // most commit validation happens in this method. Note that is handles lenient/strict modes.
139139+ newRepo, err := r.VerifyRepoCommit(ctx, evt, ident, prevRepo, hostname)
140140+ if err != nil {
141141+ logger.Warn("commit message failed verification", "err", err)
142142+ return err
143143+ }
144144+145145+ err = r.UpsertAccountRepo(acc.UID, syntax.TID(newRepo.Rev), newRepo.CommitCID, newRepo.CommitDataCID)
146146+ if err != nil {
147147+ return fmt.Errorf("failed to upsert account repo (%s): %w", acc.DID, err)
148148+ }
149149+150150+ // emit the event
151151+ // TODO: is this copy important?
152152+ commitCopy := *evt
153153+ err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
154154+ RepoCommit: &commitCopy,
155155+ PrivUid: acc.UID,
156156+ })
157157+ if err != nil {
158158+ logger.Error("failed to broadcast event", "error", err)
159159+ return fmt.Errorf("failed to broadcast #commit event: %w", err)
160160+ }
161161+162162+ return nil
163163+}
164164+165165+func (r *Relay) processSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync, hostname string, hostID uint64) error {
166166+ logger := r.Logger.With("did", evt.Did, "seq", evt.Seq, "host", hostname, "eventType", "sync")
167167+ logger.Debug("relay got sync event")
168168+169169+ acc, ident, err := r.preProcessEvent(ctx, evt.Did, hostname, hostID, logger)
170170+ if err != nil {
171171+ return err
172172+ }
173173+174174+ if !acc.IsActive() {
175175+ logger.Info("dropping sync message for non-active account", "status", acc.Status, "upstreamStatus", acc.UpstreamStatus)
176176+ return nil
177177+ }
178178+179179+ if ident == nil {
180180+ // TODO: what to do if identity resolution fails
181181+ }
182182+183183+ // TODO: should we load account 'rev' here and prevent roll-backs? or allow roll-backs?
184184+185185+ newRepo, err := r.VerifyRepoSync(ctx, evt, ident, hostname)
186186+ if err != nil {
187187+ return err
188188+ }
189189+190190+ err = r.UpsertAccountRepo(acc.UID, syntax.TID(newRepo.Rev), newRepo.CommitCID, newRepo.CommitDataCID)
191191+ if err != nil {
192192+ return fmt.Errorf("failed to upsert account repo (%s): %w", acc.DID, err)
193193+ }
194194+195195+ // emit the event
196196+ evtCopy := *evt
197197+ err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
198198+ RepoSync: &evtCopy,
199199+ PrivUid: acc.UID,
200200+ })
201201+ if err != nil {
202202+ logger.Error("failed to broadcast event", "error", err)
203203+ return fmt.Errorf("failed to broadcast #sync event: %w", err)
204204+ }
205205+ return nil
206206+}
207207+208208+func (r *Relay) processIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity, hostname string, hostID uint64) error {
209209+ logger := r.Logger.With("did", evt.Did, "seq", evt.Seq, "host", hostname, "eventType", "identity")
210210+ logger.Debug("relay got identity event")
211211+212212+ acc, _, err := r.preProcessEvent(ctx, evt.Did, hostname, hostID, logger)
213213+ if err != nil {
214214+ return err
215215+ }
216216+ did := syntax.DID(acc.DID)
217217+218218+ // Flush any cached DID/identity info for this user
219219+ err = r.Dir.Purge(ctx, did.AtIdentifier())
220220+ if err != nil {
221221+ logger.Error("problem purging identity directory cache", "err", err)
222222+ }
223223+224224+ // Broadcast the identity event to all consumers
225225+ err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
226226+ RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
227227+ Did: did.String(),
228228+ Time: evt.Time, // TODO: update to now?
229229+ Handle: evt.Handle, // TODO: we could substitute in our handle resolution here
230230+ },
231231+ PrivUid: acc.UID,
232232+ })
233233+ if err != nil {
234234+ logger.Error("failed to broadcast identity event", "error", err)
235235+ return fmt.Errorf("failed to broadcast #identity event: %w", err)
236236+ }
237237+238238+ return nil
239239+}
240240+241241+func (r *Relay) processAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account, hostname string, hostID uint64) error {
242242+ logger := r.Logger.With("did", evt.Did, "seq", evt.Seq, "host", hostname, "eventType", "account")
243243+ logger.Debug("relay got account event")
244244+245245+ ctx, span := tracer.Start(ctx, "processAccountEvent")
246246+ defer span.End()
247247+ span.SetAttributes(
248248+ attribute.String("did", evt.Did),
249249+ attribute.Int64("seq", evt.Seq),
250250+ attribute.Bool("active", evt.Active),
251251+ )
252252+253253+ acc, _, err := r.preProcessEvent(ctx, evt.Did, hostname, hostID, logger)
254254+ if err != nil {
255255+ return err
256256+ }
257257+258258+ if !evt.Active && evt.Status == nil {
259259+ logger.Warn("invalid account event", "active", evt.Active, "status", evt.Status)
260260+ }
261261+262262+ newStatus := models.AccountStatusInactive
263263+ if evt.Active {
264264+ newStatus = models.AccountStatusActive
265265+ } else if evt.Status != nil {
266266+ newStatus = models.AccountStatus(*evt.Status)
267267+ }
268268+269269+ if newStatus != acc.UpstreamStatus {
270270+ if err := r.UpdateAccountUpstreamStatus(ctx, syntax.DID(acc.DID), acc.UID, newStatus); err != nil {
271271+ return err
272272+ }
273273+ acc.UpstreamStatus = newStatus
274274+ }
275275+276276+ // emit the event
277277+ err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
278278+ RepoAccount: &comatproto.SyncSubscribeRepos_Account{
279279+ Active: acc.IsActive(),
280280+ Did: acc.DID,
281281+ Status: acc.StatusField(),
282282+ Time: evt.Time,
283283+ },
284284+ PrivUid: acc.UID,
285285+ })
286286+ if err != nil {
287287+ logger.Error("failed to broadcast event", "error", err)
288288+ return fmt.Errorf("failed to broadcast #account event: %w", err)
289289+ }
290290+291291+ return nil
292292+}
+64
cmd/relay/relay/metrics.go
···11+package relay
22+33+import (
44+ "github.com/prometheus/client_golang/prometheus"
55+ "github.com/prometheus/client_golang/prometheus/promauto"
66+)
77+88+// TODO: expose an accessor instead of exporting
99+var EventsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
1010+ Name: "events_received_counter",
1111+ Help: "The total number of events received",
1212+}, []string{"pds"})
1313+1414+var eventsWarningsCounter = promauto.NewCounterVec(prometheus.CounterOpts{
1515+ Name: "events_warn_counter",
1616+ Help: "Events received with warnings",
1717+}, []string{"pds", "warn"})
1818+1919+var eventsHandleDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
2020+ Name: "events_handle_duration",
2121+ Help: "A histogram of handleFedEvent latencies",
2222+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
2323+}, []string{"pds"})
2424+2525+var repoCommitsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
2626+ Name: "repo_commits_received_counter",
2727+ Help: "The total number of commit events received",
2828+}, []string{"pds"})
2929+var repoSyncReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
3030+ Name: "repo_sync_received_counter",
3131+ Help: "The total number of sync events received",
3232+}, []string{"pds"})
3333+3434+var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{
3535+ Name: "events_sent_counter",
3636+ Help: "The total number of events sent to consumers",
3737+}, []string{"remote_addr", "user_agent"})
3838+3939+/* NOTE: not implemented in this version of relay
4040+var externalUserCreationAttempts = promauto.NewCounter(prometheus.CounterOpts{
4141+ Name: "relay_external_user_creation_attempts",
4242+ Help: "The total number of external users created",
4343+})
4444+*/
4545+4646+var newUsersDiscovered = promauto.NewCounter(prometheus.CounterOpts{
4747+ Name: "relay_new_users_discovered",
4848+ Help: "The total number of new users discovered directly from the firehose (not from refs)",
4949+})
5050+5151+/* NOTE: not implemented in this version of relay
5252+var newUserDiscoveryDuration = promauto.NewHistogram(prometheus.HistogramOpts{
5353+ Name: "relay_new_user_discovery_duration",
5454+ Help: "A histogram of new user discovery latencies",
5555+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
5656+})
5757+*/
5858+5959+/* NOTE: not implemented
6060+var accountVerifyWarnings = promauto.NewCounterVec(prometheus.CounterOpts{
6161+ Name: "validator_account_verify_warnings",
6262+ Help: "things that have been a little bit wrong with account messages",
6363+}, []string{"host", "warn"})
6464+*/
+11
cmd/relay/relay/metrics_slurper.go
···11+package relay
22+33+import (
44+ "github.com/prometheus/client_golang/prometheus"
55+ "github.com/prometheus/client_golang/prometheus/promauto"
66+)
77+88+var connectedInbound = promauto.NewGauge(prometheus.GaugeOpts{
99+ Name: "relay_connected_inbound",
1010+ Help: "Number of inbound firehoses we are consuming",
1111+})
+48
cmd/relay/relay/models/methods.go
···11+package models
22+33+import (
44+ "fmt"
55+)
66+77+// returns base HTTP URL for the host: scheme, hostname, optional port, no path segment
88+func (h *Host) BaseURL() string {
99+ scheme := "https"
1010+ if h.NoSSL {
1111+ scheme = "http"
1212+ }
1313+ return fmt.Sprintf("%s://%s", scheme, h.Hostname)
1414+}
1515+1616+// returns websocket URL for the host: scheme, hostname, optional port, and path.
1717+func (h *Host) SubscribeReposURL() string {
1818+ scheme := "wss"
1919+ if h.NoSSL {
2020+ scheme = "ws"
2121+ }
2222+ return fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos", scheme, h.Hostname)
2323+}
2424+2525+func (a *Account) AccountStatus() AccountStatus {
2626+ if a.Status != AccountStatusActive {
2727+ if a.Status == AccountStatusHostThrottled {
2828+ return AccountStatusThrottled
2929+ }
3030+ return a.Status
3131+ }
3232+ return a.UpstreamStatus
3333+}
3434+3535+// Returns a pointer to a copy of status string; or nil if status is active.
3636+//
3737+// Helpful for account info responses which have a boolean 'active' and optional 'status' field (like the #account message)
3838+func (a *Account) StatusField() *string {
3939+ if a.IsActive() {
4040+ return nil
4141+ }
4242+ s := string(a.AccountStatus())
4343+ return &s
4444+}
4545+4646+func (a *Account) IsActive() bool {
4747+ return (a.Status == AccountStatusActive || a.Status == AccountStatusThrottled) && a.UpstreamStatus == AccountStatusActive
4848+}
···11+package models
22+33+import (
44+ "time"
55+)
66+77+type DomainBan struct {
88+ ID uint64 `gorm:"column:id;primarykey"`
99+ // CreatedAt is automatically managed by gorm (by convention)
1010+ CreatedAt time.Time
1111+1212+ Domain string `gorm:"unique"`
1313+}
1414+1515+type HostStatus string
1616+1717+const (
1818+ HostStatusActive = HostStatus("active")
1919+ HostStatusIdle = HostStatus("idle")
2020+ HostStatusOffline = HostStatus("offline")
2121+ HostStatusThrottled = HostStatus("throttled")
2222+ HostStatusBanned = HostStatus("banned")
2323+)
2424+2525+type Host struct {
2626+ ID uint64 `gorm:"column:id;primarykey"`
2727+2828+ // these fields are automatically managed by gorm (by convention)
2929+ CreatedAt time.Time
3030+ UpdatedAt time.Time
3131+3232+ // hostname, without URL scheme. if localhost, must include a port number; otherwise must not include port
3333+ Hostname string `gorm:"column:hostname;uniqueIndex;not null"`
3434+3535+ // indicates ws:// not wss://
3636+ NoSSL bool `gorm:"column:no_ssl;default:false"`
3737+3838+ // maximum number of active accounts
3939+ AccountLimit int64 `gorm:"column:account_limit"`
4040+4141+ // indicates this is a highly trusted host (PDS), and different rate limits apply
4242+ Trusted bool `gorm:"column:trusted;default:false"`
4343+4444+ Status HostStatus `gorm:"column:status;default:active"`
4545+4646+ // the last sequence number persisted for this host. updated periodically, and at shutdown. negative number indicates no sequence recorded
4747+ LastSeq int64 `gorm:"column:last_seq;default:-1"`
4848+4949+ // represents the number of accounts on the host, minus any in "deleted" state
5050+ AccountCount int64 `gorm:"column:account_count;default:0"`
5151+}
5252+5353+func (Host) TableName() string {
5454+ return "host"
5555+}
5656+5757+type AccountStatus string
5858+5959+var (
6060+ // AccountStatusActive is not in the spec but used internally
6161+ AccountStatusActive = AccountStatus("active")
6262+6363+ AccountStatusDeactivated = AccountStatus("deactivated")
6464+ AccountStatusDeleted = AccountStatus("deleted")
6565+ AccountStatusDesynchronized = AccountStatus("desynchronized")
6666+ AccountStatusSuspended = AccountStatus("suspended")
6767+ AccountStatusTakendown = AccountStatus("takendown")
6868+ AccountStatusThrottled = AccountStatus("throttled")
6969+ AccountStatusHostThrottled = AccountStatus("host-throttled") // TODO: not yet implemented
7070+7171+ // generic "not active, but not known" status
7272+ AccountStatusInactive = AccountStatus("inactive")
7373+)
7474+7575+type Account struct {
7676+ UID uint64 `gorm:"column:uid;primarykey"`
7777+ DID string `gorm:"column:did;uniqueIndex;not null"`
7878+7979+ // this is a reference to the ID field on Host; but it is not an explicit foreign key
8080+ HostID uint64 `gorm:"column:host_id;not null"`
8181+ Status AccountStatus `gorm:"column:status;not null;default:active"`
8282+ UpstreamStatus AccountStatus `gorm:"column:upstream_status;not null;default:active"`
8383+}
8484+8585+func (Account) TableName() string {
8686+ return "account"
8787+}
8888+8989+// This is a small extension table to `Account`, which holds fast-changing fields updated on every firehose event.
9090+type AccountRepo struct {
9191+ // references Account.UID, but not set up as a foreign key
9292+ UID uint64 `gorm:"column:uid;primarykey"`
9393+ Rev string `gorm:"column:rev;not null"`
9494+9595+ // The CID of the entire signed commit block. Sometimes called the "head"
9696+ CommitCID string `gorm:"column:commit_cid;not null"`
9797+9898+ // The CID of the top of the repo MST, which is the 'data' field within the commit block. This becomes 'prevData'
9999+ CommitDataCID string `gorm:"column:commit_data_cid;not null"`
100100+}
101101+102102+func (AccountRepo) TableName() string {
103103+ return "account_repo"
104104+}
+134
cmd/relay/relay/relay.go
···11+package relay
22+33+import (
44+ "log/slog"
55+ "sync"
66+77+ "github.com/bluesky-social/indigo/atproto/identity"
88+ "github.com/bluesky-social/indigo/cmd/relay/relay/models"
99+ "github.com/bluesky-social/indigo/cmd/relay/stream/eventmgr"
1010+1111+ "github.com/RussellLuo/slidingwindow"
1212+ "github.com/hashicorp/golang-lru/v2"
1313+ "go.opentelemetry.io/otel"
1414+ "gorm.io/gorm"
1515+)
1616+1717+var tracer = otel.Tracer("relay")
1818+1919+type Relay struct {
2020+ db *gorm.DB
2121+ Dir identity.Directory
2222+ Logger *slog.Logger
2323+ Slurper *Slurper
2424+ Events *eventmgr.EventManager
2525+ HostChecker HostChecker
2626+ Config RelayConfig
2727+2828+ // Management of Socket Consumers
2929+ consumersLk sync.RWMutex
3030+ nextConsumerID uint64
3131+ consumers map[uint64]*SocketConsumer
3232+3333+ // Account cache
3434+ accountCache *lru.Cache[string, *models.Account]
3535+3636+ HostPerDayLimiter *slidingwindow.Limiter
3737+}
3838+3939+type RelayConfig struct {
4040+ UserAgent string
4141+ DefaultRepoLimit int64
4242+ TrustedRepoLimit int64
4343+ ConcurrencyPerHost int
4444+ LenientSyncValidation bool
4545+ TrustedDomains []string
4646+ HostPerDayLimit int64
4747+4848+ // If true, skip validation that messages for a given account (DID) are coming from the expected upstream host (PDS). Currently only used in tests; might be used for intermediate relays in the future.
4949+ SkipAccountHostCheck bool
5050+}
5151+5252+func DefaultRelayConfig() *RelayConfig {
5353+ // NOTE: many of these defaults are clobbered by CLI arguments
5454+ return &RelayConfig{
5555+ UserAgent: "indigo-relay",
5656+ DefaultRepoLimit: 100,
5757+ TrustedRepoLimit: 10_000_000,
5858+ ConcurrencyPerHost: 40,
5959+ HostPerDayLimit: 50,
6060+ }
6161+}
6262+6363+func NewRelay(db *gorm.DB, evtman *eventmgr.EventManager, dir identity.Directory, config *RelayConfig) (*Relay, error) {
6464+6565+ if config == nil {
6666+ config = DefaultRelayConfig()
6767+ }
6868+6969+ uc, _ := lru.New[string, *models.Account](2_000_000)
7070+7171+ hc := NewHostClient(config.UserAgent)
7272+7373+ // NOTE: discarded second argument is not an `error` type
7474+7575+ r := &Relay{
7676+ db: db,
7777+ Dir: dir,
7878+ Logger: slog.Default().With("system", "relay"),
7979+ Events: evtman,
8080+ HostChecker: hc,
8181+ Config: *config,
8282+8383+ consumersLk: sync.RWMutex{},
8484+ consumers: make(map[uint64]*SocketConsumer),
8585+8686+ accountCache: uc,
8787+8888+ HostPerDayLimiter: perDayLimiter(config.HostPerDayLimit),
8989+ }
9090+9191+ if err := r.MigrateDatabase(); err != nil {
9292+ return nil, err
9393+ }
9494+9595+ slurpConfig := DefaultSlurperConfig()
9696+ slurpConfig.ConcurrencyPerHost = config.ConcurrencyPerHost
9797+9898+ // register callbacks to persist cursors and host state in database
9999+ slurpConfig.PersistCursorCallback = r.PersistHostCursors
100100+ slurpConfig.PersistHostStatusCallback = r.UpdateHostStatus
101101+102102+ s, err := NewSlurper(r.processRepoEvent, slurpConfig, r.Logger)
103103+ if err != nil {
104104+ return nil, err
105105+ }
106106+ r.Slurper = s
107107+108108+ // TODO: should this happen in a separate "start" method, instead of "NewRelay()"?
109109+ if err := r.ResubscribeAllHosts(); err != nil {
110110+ return nil, err
111111+ }
112112+ return r, nil
113113+}
114114+115115+func (r *Relay) MigrateDatabase() error {
116116+ if err := r.db.AutoMigrate(models.DomainBan{}); err != nil {
117117+ return err
118118+ }
119119+ if err := r.db.AutoMigrate(models.Host{}); err != nil {
120120+ return err
121121+ }
122122+ if err := r.db.AutoMigrate(models.Account{}); err != nil {
123123+ return err
124124+ }
125125+ if err := r.db.AutoMigrate(models.AccountRepo{}); err != nil {
126126+ return err
127127+ }
128128+ return nil
129129+}
130130+131131+// simple check of connection to database
132132+func (r *Relay) Healthcheck() error {
133133+ return r.db.Exec("SELECT 1").Error
134134+}
+544
cmd/relay/relay/slurper.go
···11+package relay
22+33+import (
44+ "context"
55+ "errors"
66+ "fmt"
77+ "log/slog"
88+ "math/rand"
99+ "net/http"
1010+ "sync"
1111+ "sync/atomic"
1212+ "time"
1313+1414+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1515+ "github.com/bluesky-social/indigo/cmd/relay/relay/models"
1616+ "github.com/bluesky-social/indigo/cmd/relay/stream"
1717+ "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
1818+1919+ "github.com/RussellLuo/slidingwindow"
2020+ "github.com/gorilla/websocket"
2121+)
2222+2323+// TODO: this isn't actually getting setup or used?
2424+var EventsTimeout = time.Minute
2525+2626+type ProcessMessageFunc func(ctx context.Context, evt *stream.XRPCStreamEvent, hostname string, hostID uint64) error
2727+type PersistCursorFunc func(ctx context.Context, cursors *[]HostCursor) error
2828+type PersistHostStatusFunc func(ctx context.Context, hostID uint64, state models.HostStatus) error
2929+3030+// `Slurper` is the sub-system of the relay which manages active websocket firehose connections to upstream hosts (eg, PDS instances).
3131+//
3232+// It configures rate-limits, tracks cursors, and retries connections. It passes received messages on to the main relay via a callback function. `Slurper` does not talk to the database directly, but does have some callback to persist host state (cursors and hosting status for some error conditions).
3333+type Slurper struct {
3434+ processCallback ProcessMessageFunc
3535+ Config *SlurperConfig
3636+3737+ subsLk sync.Mutex
3838+ subs map[string]*Subscription
3939+4040+ shutdownChan chan bool
4141+ shutdownResult chan error
4242+4343+ logger *slog.Logger
4444+}
4545+4646+type SlurperConfig struct {
4747+ UserAgent string
4848+ ConcurrencyPerHost int
4949+ QueueDepthPerHost int
5050+ PersistCursorPeriod time.Duration
5151+5252+ BaselinePerSecondLimit int64
5353+ BaselinePerHourLimit int64
5454+ BaselinePerDayLimit int64
5555+ TrustedPerSecondLimit int64
5656+ TrustedPerHourLimit int64
5757+ TrustedPerDayLimit int64
5858+5959+ // callback functions. technically optional but effectively required
6060+ PersistCursorCallback PersistCursorFunc
6161+ PersistHostStatusCallback PersistHostStatusFunc
6262+}
6363+6464+func DefaultSlurperConfig() *SlurperConfig {
6565+ // NOTE: many of these defaults are overruled by DefaultRelayConfig, or even process CLI arg defaults
6666+ return &SlurperConfig{
6767+ UserAgent: "indigo-relay",
6868+ ConcurrencyPerHost: 40,
6969+ // NOTE: queue depth doesn't do anything with current parallel scheduler implementation
7070+ QueueDepthPerHost: 1000,
7171+ PersistCursorPeriod: time.Second * 4,
7272+7373+ // these are the minimum event rates for regular public hosts
7474+ BaselinePerSecondLimit: 50,
7575+ BaselinePerHourLimit: 2500,
7676+ BaselinePerDayLimit: 20_000,
7777+7878+ // these are the fixed event rates for trusted hosts (eg, same service provider as relay)
7979+ TrustedPerSecondLimit: 5_000,
8080+ TrustedPerHourLimit: 50_000_000,
8181+ TrustedPerDayLimit: 500_000_000,
8282+ }
8383+}
8484+8585+// represents an active client connection to a remote host
8686+type Subscription struct {
8787+ Hostname string
8888+ HostID uint64
8989+ LastSeq atomic.Int64
9090+ Limiters *StreamLimiters
9191+9292+ scheduler *parallel.Scheduler
9393+ lk sync.RWMutex
9494+ ctx context.Context
9595+ cancel func()
9696+}
9797+9898+// pulls lastSeq from underlying scheduler in to this Subscription
9999+func (sub *Subscription) UpdateSeq() {
100100+ sub.LastSeq.Store(sub.scheduler.LastSeq())
101101+}
102102+103103+func (sub *Subscription) HostCursor() HostCursor {
104104+ sub.lk.Lock()
105105+ defer sub.lk.Unlock()
106106+ return HostCursor{
107107+ HostID: sub.HostID,
108108+ LastSeq: sub.LastSeq.Load(),
109109+ }
110110+}
111111+112112+type StreamLimiterCounts struct {
113113+ PerSecond int64
114114+ PerHour int64
115115+ PerDay int64
116116+}
117117+118118+type StreamLimiters struct {
119119+ PerSecond *slidingwindow.Limiter
120120+ PerHour *slidingwindow.Limiter
121121+ PerDay *slidingwindow.Limiter
122122+}
123123+124124+func (sl *StreamLimiters) Counts() StreamLimiterCounts {
125125+ return StreamLimiterCounts{
126126+ PerSecond: sl.PerSecond.Limit(),
127127+ PerHour: sl.PerHour.Limit(),
128128+ PerDay: sl.PerDay.Limit(),
129129+ }
130130+}
131131+132132+func NewSlurper(processCallback ProcessMessageFunc, config *SlurperConfig, logger *slog.Logger) (*Slurper, error) {
133133+ if processCallback == nil {
134134+ return nil, fmt.Errorf("processCallback is required")
135135+ }
136136+ if config == nil {
137137+ config = DefaultSlurperConfig()
138138+ }
139139+ if logger == nil {
140140+ logger = slog.Default()
141141+ }
142142+143143+ logger = logger.With("system", "slurper")
144144+ s := &Slurper{
145145+ processCallback: processCallback,
146146+ Config: config,
147147+ subs: make(map[string]*Subscription),
148148+ shutdownChan: make(chan bool),
149149+ shutdownResult: make(chan error),
150150+ logger: logger,
151151+ }
152152+153153+ // Start a goroutine to persist cursors (both periodically and and on shutdown)
154154+ go func() {
155155+ for {
156156+ select {
157157+ case <-s.shutdownChan:
158158+ s.logger.Info("starting shutdown host cursor flush")
159159+ s.shutdownResult <- s.persistCursors(context.Background())
160160+ return
161161+ case <-time.After(config.PersistCursorPeriod):
162162+ if err := s.persistCursors(context.Background()); err != nil {
163163+ s.logger.Error("failed to flush cursors", "err", err)
164164+ }
165165+ }
166166+ }
167167+ }()
168168+169169+ return s, nil
170170+}
171171+172172+func windowFunc() (slidingwindow.Window, slidingwindow.StopFunc) {
173173+ return slidingwindow.NewLocalWindow()
174174+}
175175+176176+func (s *Slurper) ComputeLimiterCounts(accountLimit int64, trusted bool) StreamLimiterCounts {
177177+ if trusted {
178178+ return StreamLimiterCounts{
179179+ PerSecond: s.Config.TrustedPerSecondLimit,
180180+ PerHour: s.Config.TrustedPerHourLimit,
181181+ PerDay: s.Config.TrustedPerDayLimit,
182182+ }
183183+ }
184184+ return StreamLimiterCounts{
185185+ PerSecond: s.Config.BaselinePerSecondLimit + (accountLimit / 1000),
186186+ PerHour: s.Config.BaselinePerHourLimit + accountLimit,
187187+ PerDay: s.Config.BaselinePerDayLimit + accountLimit*10,
188188+ }
189189+}
190190+191191+func (s *Slurper) UpdateLimiters(hostname string, accountLimit int64, trusted bool) error {
192192+193193+ newLims := s.ComputeLimiterCounts(accountLimit, trusted)
194194+195195+ s.subsLk.Lock()
196196+ defer s.subsLk.Unlock()
197197+198198+ sub, ok := s.subs[hostname]
199199+ if !ok {
200200+ return fmt.Errorf("updating limits for %s: %w", hostname, ErrNoActiveConnection)
201201+ }
202202+203203+ sub.Limiters.PerSecond.SetLimit(newLims.PerSecond)
204204+ sub.Limiters.PerHour.SetLimit(newLims.PerHour)
205205+ sub.Limiters.PerDay.SetLimit(newLims.PerDay)
206206+207207+ return nil
208208+}
209209+210210+func (s *Slurper) GetLimits(hostname string) (*StreamLimiterCounts, error) {
211211+ s.subsLk.Lock()
212212+ defer s.subsLk.Unlock()
213213+214214+ sub, ok := s.subs[hostname]
215215+ if !ok {
216216+ return nil, fmt.Errorf("reading limits for %s: %w", hostname, ErrNoActiveConnection)
217217+ }
218218+219219+ slc := sub.Limiters.Counts()
220220+ return &slc, nil
221221+}
222222+223223+// Shutdown shuts down the entire Slurper (all subscriptions)
224224+func (s *Slurper) Shutdown() error {
225225+ s.shutdownChan <- true
226226+ s.logger.Info("waiting for slurper shutdown")
227227+ err := <-s.shutdownResult
228228+ if err != nil {
229229+ s.logger.Error("shutdown error", "err", err)
230230+ }
231231+ s.logger.Info("slurper shutdown complete")
232232+ return err
233233+}
234234+235235+func (s *Slurper) CheckIfSubscribed(hostname string) bool {
236236+ s.subsLk.Lock()
237237+ defer s.subsLk.Unlock()
238238+239239+ _, ok := s.subs[hostname]
240240+ return ok
241241+}
242242+243243+// high-level entry point for opening a subscription (websocket connection). This might be called when adding a new host, or when re-connecting to a previously subscribed host.
244244+//
245245+// NOTE: the `host` parameter (a database row) contains metadata about the host at a point in time. Subsequent changes to the database aren't reflected in that struct, and changes to the struct don't get persisted to database.
246246+func (s *Slurper) Subscribe(host *models.Host, newHost bool) error {
247247+ // TODO: replace newHost with a check for negative number on host.LastSeq (via IsNewHost helper method on `models.Host`?)
248248+ s.subsLk.Lock()
249249+ defer s.subsLk.Unlock()
250250+251251+ _, ok := s.subs[host.Hostname]
252252+ if ok {
253253+ return fmt.Errorf("already subscribed: %s", host.Hostname)
254254+ }
255255+256256+ counts := s.ComputeLimiterCounts(host.AccountLimit, host.Trusted)
257257+ perSec, _ := slidingwindow.NewLimiter(time.Second, counts.PerSecond, windowFunc)
258258+ perHour, _ := slidingwindow.NewLimiter(time.Hour, counts.PerHour, windowFunc)
259259+ perDay, _ := slidingwindow.NewLimiter(time.Hour*24, counts.PerDay, windowFunc)
260260+ limiters := &StreamLimiters{
261261+ PerSecond: perSec,
262262+ PerHour: perHour,
263263+ PerDay: perDay,
264264+ }
265265+266266+ ctx, cancel := context.WithCancel(context.Background())
267267+ sub := Subscription{
268268+ Hostname: host.Hostname,
269269+ HostID: host.ID,
270270+ Limiters: limiters,
271271+ ctx: ctx,
272272+ cancel: cancel,
273273+ }
274274+ sub.LastSeq.Store(host.LastSeq)
275275+ s.subs[host.Hostname] = &sub
276276+277277+ go s.subscribeWithRedialer(ctx, host, &sub, newHost)
278278+279279+ return nil
280280+}
281281+282282+// Main event-loop for a subscription (websocket connection to upstream host), expected to be called as a goroutine.
283283+//
284284+// On connection failure (drop or failed initial connection), will attempt re-connects, with backoff.
285285+func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.Host, sub *Subscription, newHost bool) {
286286+ defer func() {
287287+ s.subsLk.Lock()
288288+ defer s.subsLk.Unlock()
289289+290290+ delete(s.subs, host.Hostname)
291291+ }()
292292+293293+ d := websocket.Dialer{
294294+ HandshakeTimeout: time.Second * 5,
295295+ }
296296+297297+ cursor := host.LastSeq
298298+299299+ connectedInbound.Inc()
300300+ defer connectedInbound.Dec()
301301+ // TODO: add a metric for number of subscriptions which are attempting to reconnect
302302+303303+ var backoff int
304304+ for {
305305+ select {
306306+ case <-ctx.Done():
307307+ return
308308+ default:
309309+ }
310310+311311+ u := host.SubscribeReposURL()
312312+ if !newHost {
313313+ u = fmt.Sprintf("%s?cursor=%d", u, cursor)
314314+ }
315315+ hdr := make(http.Header)
316316+ hdr.Add("User-Agent", s.Config.UserAgent)
317317+ conn, res, err := d.DialContext(ctx, u, hdr)
318318+ if err != nil {
319319+ s.logger.Warn("dialing failed", "host", host.Hostname, "err", err, "backoff", backoff)
320320+ time.Sleep(sleepForBackoff(backoff))
321321+ backoff++
322322+323323+ if backoff > 15 {
324324+ s.logger.Warn("host does not appear to be online, disabling for now", "host", host.Hostname)
325325+ if err := s.Config.PersistHostStatusCallback(ctx, sub.HostID, models.HostStatusOffline); err != nil {
326326+ s.logger.Error("failed mark host as stale", "hostname", sub.Hostname, "err", err)
327327+ }
328328+ return
329329+ }
330330+331331+ continue
332332+ }
333333+334334+ s.logger.Info("event subscription response", "code", res.StatusCode, "url", u)
335335+336336+ curCursor := cursor
337337+ if err := s.handleConnection(ctx, conn, &cursor, sub); err != nil {
338338+ if errors.Is(err, ErrTimeoutShutdown) {
339339+ s.logger.Info("shutting down host subscription after timeout", "host", host.Hostname, "time", EventsTimeout.String())
340340+ return
341341+ }
342342+ s.logger.Warn("connection to failed", "host", host.Hostname, "err", err)
343343+ // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl
344344+ }
345345+346346+ if cursor > curCursor {
347347+ backoff = 0
348348+ }
349349+ }
350350+}
351351+352352+func sleepForBackoff(b int) time.Duration {
353353+ if b == 0 {
354354+ return 0
355355+ }
356356+357357+ if b < 10 {
358358+ return (time.Duration(b) * 2) + (time.Millisecond * time.Duration(rand.Intn(1000)))
359359+ }
360360+361361+ return time.Second * 30
362362+}
363363+364364+// Configures event processing for a websocket connection, using the parallel schedule helper library, with all events processed using the configured callback function.
365365+func (s *Slurper) handleConnection(ctx context.Context, conn *websocket.Conn, lastCursor *int64, sub *Subscription) error {
366366+ ctx, cancel := context.WithCancel(ctx)
367367+ defer cancel()
368368+369369+ rsc := &stream.RepoStreamCallbacks{
370370+ RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
371371+ logger := s.logger.With("host", sub.Hostname, "did", evt.Repo, "seq", evt.Seq, "eventType", "commit")
372372+ logger.Debug("got remote repo event")
373373+ if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoCommit: evt}, sub.Hostname, sub.HostID); err != nil {
374374+ logger.Error("failed handling event", "err", err)
375375+ }
376376+ sub.UpdateSeq()
377377+378378+ return nil
379379+ },
380380+ RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error {
381381+ logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "sync")
382382+ logger.Debug("commit event")
383383+ if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoSync: evt}, sub.Hostname, sub.HostID); err != nil {
384384+ s.logger.Error("failed handling event", "err", err)
385385+ }
386386+ sub.UpdateSeq()
387387+388388+ return nil
389389+ },
390390+ RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
391391+ logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "identity")
392392+ logger.Debug("identity event")
393393+ if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoIdentity: evt}, sub.Hostname, sub.HostID); err != nil {
394394+ logger.Error("failed handling event", "err", err)
395395+ }
396396+ sub.UpdateSeq()
397397+398398+ return nil
399399+ },
400400+ RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error {
401401+ logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "account")
402402+ s.logger.Debug("account event")
403403+ if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoAccount: evt}, sub.Hostname, sub.HostID); err != nil {
404404+ logger.Error("failed handling event", "err", err)
405405+ }
406406+ sub.UpdateSeq()
407407+408408+ return nil
409409+ },
410410+ Error: func(evt *stream.ErrorFrame) error {
411411+ // TODO: verbose logging
412412+ switch evt.Error {
413413+ case "FutureCursor":
414414+ // TODO: need test coverage for this code path (including re-connect)
415415+ // if we get a FutureCursor frame, reset our sequence number for this host
416416+ if s.Config.PersistCursorCallback != nil {
417417+ hc := []HostCursor{sub.HostCursor()}
418418+ if err := s.Config.PersistCursorCallback(context.Background(), &hc); err != nil {
419419+ s.logger.Error("failed to reset cursor for host which sent FutureCursor error message", "hostname", sub.Hostname, "err", err)
420420+ }
421421+ } else {
422422+ s.logger.Warn("skipping FutureCursor fix because PersistCursorCallback registered", "hostname", sub.Hostname)
423423+ }
424424+ *lastCursor = 0
425425+ // TODO: should this really return an error?
426426+ return fmt.Errorf("got FutureCursor frame, reset cursor tracking for host")
427427+ default:
428428+ return fmt.Errorf("error frame: %s: %s", evt.Error, evt.Message)
429429+ }
430430+ },
431431+ RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
432432+ s.logger.Debug("info event", "name", info.Name, "message", info.Message, "host", sub.Hostname)
433433+ return nil
434434+ },
435435+ RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { // DEPRECATED
436436+ logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "handle")
437437+ logger.Debug("got remote handle update event", "handle", evt.Handle)
438438+ if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoHandle: evt}, sub.Hostname, sub.HostID); err != nil {
439439+ logger.Error("failed handling event", "err", err)
440440+ }
441441+ sub.UpdateSeq()
442442+ return nil
443443+ },
444444+ RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { // DEPRECATED
445445+ logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "migrate")
446446+ logger.Debug("got remote repo migrate event", "migrateTo", evt.MigrateTo)
447447+ if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoMigrate: evt}, sub.Hostname, sub.HostID); err != nil {
448448+ logger.Error("failed handling event", "err", err)
449449+ }
450450+ sub.UpdateSeq()
451451+ return nil
452452+ },
453453+ RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { // DEPRECATED
454454+ logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "tombstone")
455455+ logger.Debug("got remote repo tombstone event")
456456+ if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoTombstone: evt}, sub.Hostname, sub.HostID); err != nil {
457457+ logger.Error("failed handling event", "err", err)
458458+ }
459459+ sub.UpdateSeq()
460460+ return nil
461461+ },
462462+ }
463463+464464+ limiters := []*slidingwindow.Limiter{
465465+ sub.Limiters.PerSecond,
466466+ sub.Limiters.PerHour,
467467+ sub.Limiters.PerDay,
468468+ }
469469+470470+ // NOTE: `InstrumentedRepoStreamCallbacks` is where event limiters get called/enforced
471471+ instrumentedRSC := stream.NewInstrumentedRepoStreamCallbacks(limiters, rsc.EventHandler)
472472+473473+ sub.scheduler = parallel.NewScheduler(
474474+ s.Config.ConcurrencyPerHost,
475475+ s.Config.QueueDepthPerHost,
476476+ conn.RemoteAddr().String(),
477477+ instrumentedRSC.EventHandler,
478478+ )
479479+ connLogger := s.logger.With("host", sub.Hostname)
480480+ return stream.HandleRepoStream(ctx, conn, sub.scheduler, connLogger)
481481+}
482482+483483+type HostCursor struct {
484484+ HostID uint64
485485+ LastSeq int64
486486+}
487487+488488+// persistCursors sends all cursors to callback to be persisted in database (if registered)
489489+func (s *Slurper) persistCursors(ctx context.Context) error {
490490+ if s.Config.PersistCursorCallback == nil {
491491+ s.logger.Warn("skipping cursor persist because no PersistCursorCallback registered")
492492+ return nil
493493+ }
494494+ start := time.Now()
495495+496496+ // gather cursors: lock overall set, then lock each individual subscription while gathering
497497+ s.subsLk.Lock()
498498+ cursors := make([]HostCursor, len(s.subs))
499499+ i := 0
500500+ for _, sub := range s.subs {
501501+ cursors[i] = HostCursor{
502502+ HostID: sub.HostID,
503503+ LastSeq: sub.LastSeq.Load(),
504504+ }
505505+ i++
506506+ }
507507+ s.subsLk.Unlock()
508508+509509+ err := s.Config.PersistCursorCallback(ctx, &cursors)
510510+ s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err)
511511+ return err
512512+}
513513+514514+// gets a snapshot of current subsription hostnames
515515+func (s *Slurper) GetActiveSubHostnames() []string {
516516+ s.subsLk.Lock()
517517+ defer s.subsLk.Unlock()
518518+519519+ var keys []string
520520+ for k := range s.subs {
521521+ keys = append(keys, k)
522522+ }
523523+ return keys
524524+}
525525+526526+func (s *Slurper) KillUpstreamConnection(hostname string, ban bool) error {
527527+ s.subsLk.Lock()
528528+ defer s.subsLk.Unlock()
529529+530530+ sub, ok := s.subs[hostname]
531531+ if !ok {
532532+ return fmt.Errorf("killing connection %q: %w", hostname, ErrNoActiveConnection)
533533+ }
534534+ sub.cancel()
535535+ // cleanup in the run thread subscribeWithRedialer() will delete(s.active, host)
536536+537537+ if ban && s.Config.PersistHostStatusCallback != nil {
538538+ if err := s.Config.PersistHostStatusCallback(context.TODO(), sub.HostID, models.HostStatusBanned); err != nil {
539539+ return fmt.Errorf("failed to set host as banned: %w", err)
540540+ }
541541+ }
542542+543543+ return nil
544544+}