···1515 "gorm.io/gorm"
1616)
17171818-// this function with exact name and args implements the `diskpersist.UidSource` interface
1919-func (r *Relay) DidToUid(ctx context.Context, did string) (uint64, error) {
2020- // NOTE: assuming DID is correct syntax (this is usually "loop back")
2121- xu, err := r.GetAccount(ctx, syntax.DID(did))
2222- if err != nil {
2323- return 0, err
2424- }
2525- if xu == nil {
2626- return 0, ErrAccountNotFound
2727- }
2828- return xu.UID, nil
2929-}
3030-3118func (r *Relay) GetAccount(ctx context.Context, did syntax.DID) (*models.Account, error) {
3232- ctx, span := tracer.Start(ctx, "getAccount")
1919+ ctx, span := tracer.Start(ctx, "GetAccount")
3320 defer span.End()
34213535- /* XXX
3636- cu, ok := r.accountCache.Get(did)
2222+ // first try cache
2323+ a, ok := r.accountCache.Get(did.String())
3724 if ok {
3838- return cu, nil
2525+ return a, nil
3926 }
4040- */
41274228 var acc models.Account
4329 if err := r.db.Where("did = ?", did).First(&acc).Error; err != nil {
···4733 return nil, err
4834 }
49355050- // TODO: is this further check needed?
3636+ // TODO: is this zero UID check redundant?
5137 if acc.UID == 0 {
5238 return nil, ErrAccountNotFound
5339 }
54405555- /* XXX:
5656- r.accountCache.Add(did, &u)
5757- */
4141+ r.accountCache.Add(did.String(), &acc)
58425943 return &acc, nil
6044}
···7862 account, err := r.syncHostAccount(ctx, did, host, nil)
7963 newUserDiscoveryDuration.Observe(time.Since(start).Seconds())
8064 if err != nil {
8181- repoCommitsResultCounter.WithLabelValues(host.Hostname, "uerr").Inc()
8265 return nil, fmt.Errorf("fed event create external user: %w", err)
8366 }
8467 return account, nil
···148131 // we got an event from a non-canonical Host (an intermediate relay)
149132 // a non-canonical Host we haven't seen before; ping it to make sure it's real
150133 // TODO: what do we actually want to track about the source we immediately got this message from vs the canonical Host?
151151- r.Logger.Warn("pds discovered in new user flow", "pds", durl.String(), "did", did)
134134+ r.Logger.Warn("new host discovered in create user flow", "pds", durl.String(), "did", did)
152135153136 err = r.HostChecker.CheckHost(ctx, durl.String())
154137 if err != nil {
···183166 }
184167185168 // this lock just governs the lower half of this function
186186- r.extUserLk.Lock()
187187- defer r.extUserLk.Unlock()
169169+ r.accountLk.Lock()
170170+ defer r.accountLk.Unlock()
188171189172 if cachedAccount == nil {
190173 cachedAccount, err = r.GetAccount(ctx, did)
···206189 tx.Model(&models.Host{}).Where("id = ?", caHost).Update("account_count", gorm.Expr("account_count - 1"))
207190 }
208191 // update user's Host ID
209209- res := tx.Model(models.Account{}).Where("id = ?", cachedAccount.UID).Update("pds", canonicalHost.ID)
192192+ res := tx.Model(models.Account{}).Where("uid = ?", cachedAccount.UID).Update("host_id", canonicalHost.ID)
210193 if res.Error != nil {
211194 return fmt.Errorf("failed to update users pds: %w", res.Error)
212195 }
···217200218201 // XXX: cachedAccount.SetHost(canonicalHost.ID)
219202 cachedAccount.HostID = canonicalHost.ID
203203+204204+ // flush account cache
205205+ r.accountCache.Remove(did.String())
220206 }
221207 return cachedAccount, nil
222208 }
···259245 return err
260246 }
261247248248+ // clear account cache
249249+ r.accountCache.Remove(did.String())
250250+262251 // NOTE: not wiping events for user from persister (backfill window)
263252 return nil
264253}
···266255func (r *Relay) ListAccounts(ctx context.Context, cursor int64, limit int) ([]*models.Account, error) {
267256268257 accounts := []*models.Account{}
269269- if err := r.db.Model(&models.Account{}).Where("id > ? AND NOT taken_down AND (upstream_status IS NULL OR upstream_status = 'active')", cursor).Order("id").Limit(limit).Find(&accounts).Error; err != nil {
258258+ if err := r.db.Model(&models.Account{}).Where("uid > ? AND status IS NOT 'takendown' AND (upstream_status IS NULL OR upstream_status = 'active')", cursor).Order("uid").Limit(limit).Find(&accounts).Error; err != nil {
270259 return nil, err
271260 }
272261 return accounts, nil
···275264func (r *Relay) UpsertAccountRepo(uid uint64, rev syntax.TID, commitCID, commitDataCID cid.Cid) error {
276265 return r.db.Exec("INSERT INTO account_repo (uid, rev, commit_cid, commit_data) VALUES (?, ?, ?, ?) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_cid = EXCLUDED.commit_cid, commit_data = EXCLUDED.commit_data", uid, rev, commitCID.String(), commitDataCID.String()).Error
277266}
267267+268268+// this function with exact name and args implements the `diskpersist.UidSource` interface
269269+func (r *Relay) DidToUid(ctx context.Context, did string) (uint64, error) {
270270+ // NOTE: not re-parsing DID here (this function is called "loopback" from persister)
271271+ xu, err := r.GetAccount(ctx, syntax.DID(did))
272272+ if err != nil {
273273+ return 0, err
274274+ }
275275+ if xu == nil {
276276+ return 0, ErrAccountNotFound
277277+ }
278278+ return xu.UID, nil
279279+}
+162-302
cmd/relayered/relay/firehose.go
···2233import (
44 "context"
55- "errors"
65 "fmt"
66+ "net"
77+ "net/http"
88+ "sync"
79 "time"
81099- comatproto "github.com/bluesky-social/indigo/api/atproto"
1010- "github.com/bluesky-social/indigo/atproto/syntax"
1111- "github.com/bluesky-social/indigo/cmd/relayered/relay/models"
1211 "github.com/bluesky-social/indigo/cmd/relayered/stream"
13121414- "github.com/ipfs/go-cid"
1515- "go.opentelemetry.io/otel/attribute"
1616- "gorm.io/gorm"
1313+ "github.com/gorilla/websocket"
1414+ promclient "github.com/prometheus/client_golang/prometheus"
1515+ dto "github.com/prometheus/client_model/go"
1716)
18171919-// handleFedEvent() is the callback passed to Slurper called from Slurper.handleConnection()
2020-// XXX: evt not env
2121-func (r *Relay) handleFedEvent(ctx context.Context, host *models.Host, env *stream.XRPCStreamEvent) error {
2222- ctx, span := tracer.Start(ctx, "handleFedEvent")
2323- defer span.End()
1818+type SocketConsumer struct {
1919+ UserAgent string
2020+ RemoteAddr string
2121+ ConnectedAt time.Time
2222+ EventsSent promclient.Counter
2323+}
24242525- start := time.Now()
2626- defer func() {
2727- eventsHandleDuration.WithLabelValues(host.Hostname).Observe(time.Since(start).Seconds())
2828- }()
2525+func (r *Relay) registerConsumer(c *SocketConsumer) uint64 {
2626+ r.consumersLk.Lock()
2727+ defer r.consumersLk.Unlock()
29283030- EventsReceivedCounter.WithLabelValues(host.Hostname).Add(1)
2929+ id := r.nextConsumerID
3030+ r.nextConsumerID++
31313232- switch {
3333- case env.RepoCommit != nil:
3434- repoCommitsReceivedCounter.WithLabelValues(host.Hostname).Add(1)
3535- return r.handleCommit(ctx, host, env.RepoCommit)
3636- case env.RepoSync != nil:
3737- repoSyncReceivedCounter.WithLabelValues(host.Hostname).Add(1)
3838- return r.handleSync(ctx, host, env.RepoSync)
3939- case env.RepoHandle != nil:
4040- eventsWarningsCounter.WithLabelValues(host.Hostname, "handle").Add(1)
4141- // TODO: rate limit warnings per Host before we (temporarily?) block them
4242- return nil
4343- case env.RepoIdentity != nil:
4444- r.Logger.Info("relay got identity event", "did", env.RepoIdentity.Did)
3232+ r.consumers[id] = c
45334646- did, err := syntax.ParseDID(env.RepoIdentity.Did)
4747- if err != nil {
4848- return fmt.Errorf("invalid DID in message: %w", err)
4949- }
3434+ return id
3535+}
50365151- // Flush any cached DID documents for this user
5252- r.purgeDidCache(ctx, did.String())
3737+func (r *Relay) cleanupConsumer(id uint64) {
3838+ r.consumersLk.Lock()
3939+ defer r.consumersLk.Unlock()
53405454- // Refetch the DID doc and update our cached keys and handle etc.
5555- account, err := r.syncHostAccount(ctx, did, host, nil)
5656- if err != nil {
5757- return err
5858- }
4141+ c := r.consumers[id]
59426060- // Broadcast the identity event to all consumers
6161- err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
6262- RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
6363- Did: did.String(),
6464- Seq: env.RepoIdentity.Seq,
6565- Time: env.RepoIdentity.Time,
6666- Handle: env.RepoIdentity.Handle,
6767- },
6868- PrivUid: account.UID,
6969- })
7070- if err != nil {
7171- r.Logger.Error("failed to broadcast Identity event", "error", err, "did", did)
7272- return fmt.Errorf("failed to broadcast Identity event: %w", err)
7373- }
4343+ var m = &dto.Metric{}
4444+ if err := c.EventsSent.Write(m); err != nil {
4545+ r.Logger.Error("failed to get sent counter", "err", err)
4646+ }
74477575- return nil
7676- case env.RepoAccount != nil:
7777- span.SetAttributes(
7878- attribute.String("did", env.RepoAccount.Did),
7979- attribute.Int64("seq", env.RepoAccount.Seq),
8080- attribute.Bool("active", env.RepoAccount.Active),
8181- )
4848+ r.Logger.Info("consumer disconnected",
4949+ "consumer_id", id,
5050+ "remote_addr", c.RemoteAddr,
5151+ "user_agent", c.UserAgent,
5252+ "events_sent", m.Counter.GetValue())
82538383- did, err := syntax.ParseDID(env.RepoAccount.Did)
8484- if err != nil {
8585- return fmt.Errorf("invalid DID in message: %w", err)
8686- }
5454+ delete(r.consumers, id)
5555+}
87568888- if env.RepoAccount.Status != nil {
8989- span.SetAttributes(attribute.String("repo_status", *env.RepoAccount.Status))
9090- }
9191- r.Logger.Info("relay got account event", "did", env.RepoAccount.Did)
5757+// Main HTTP request handler for clients connecting to the firehose (com.atproto.sync.subscribeRepos)
5858+func (r *Relay) HandleSubscribeRepos(resp http.ResponseWriter, req *http.Request, since *int64, realIP string) error {
92599393- if !env.RepoAccount.Active && env.RepoAccount.Status == nil {
9494- // TODO: semantics here aren't really clear
9595- r.Logger.Warn("dropping invalid account event", "did", env.RepoAccount.Did, "active", env.RepoAccount.Active, "status", env.RepoAccount.Status)
9696- accountVerifyWarnings.WithLabelValues(host.Hostname, "nostat").Inc()
9797- return nil
9898- }
6060+ ctx, cancel := context.WithCancel(req.Context())
6161+ defer cancel()
9962100100- // Flush any cached DID documents for this user
101101- r.purgeDidCache(ctx, did.String())
6363+ conn, err := websocket.Upgrade(resp, req, resp.Header(), 10<<10, 10<<10)
6464+ if err != nil {
6565+ return fmt.Errorf("upgrading websocket: %w", err)
6666+ }
10267103103- // Refetch the DID doc to make sure the Host is still authoritative
104104- account, err := r.syncHostAccount(ctx, did, host, nil)
105105- if err != nil {
106106- span.RecordError(err)
107107- return err
108108- }
6868+ defer conn.Close()
10969110110- // Check if the Host is still authoritative
111111- // if not we don't want to be propagating this account event
112112- // XXX: lock
113113- if account.HostID != host.ID && !r.Config.SkipAccountHostCheck {
114114- r.Logger.Error("account event from non-authoritative pds",
115115- "seq", env.RepoAccount.Seq,
116116- "did", env.RepoAccount.Did,
117117- "event_from", host.Hostname,
118118- "did_doc_declared_pds", account.HostID,
119119- "account_evt", env.RepoAccount,
120120- )
121121- return fmt.Errorf("event from non-authoritative pds")
122122- }
7070+ lastWriteLk := sync.Mutex{}
7171+ lastWrite := time.Now()
12372124124- // Process the account status change
125125- repoStatus := models.AccountStatusActive
126126- if !env.RepoAccount.Active && env.RepoAccount.Status != nil {
127127- repoStatus = models.AccountStatus(*env.RepoAccount.Status)
128128- }
7373+ // Start a goroutine to ping the client every 30 seconds to check if it's
7474+ // still alive. If the client doesn't respond to a ping within 5 seconds,
7575+ // we'll close the connection and teardown the consumer.
7676+ go func() {
7777+ ticker := time.NewTicker(30 * time.Second)
7878+ defer ticker.Stop()
12979130130- // XXX: lock, and parse
131131- account.UpstreamStatus = models.AccountStatus(repoStatus)
132132- err = r.db.Save(account).Error
133133- if err != nil {
134134- span.RecordError(err)
135135- return fmt.Errorf("failed to update account status: %w", err)
136136- }
8080+ for {
8181+ select {
8282+ case <-ticker.C:
8383+ lastWriteLk.Lock()
8484+ lw := lastWrite
8585+ lastWriteLk.Unlock()
13786138138- shouldBeActive := env.RepoAccount.Active
139139- status := env.RepoAccount.Status
8787+ if time.Since(lw) < 30*time.Second {
8888+ continue
8989+ }
14090141141- // override with local status
142142- // XXX: lock
143143- if account.Status == "takendown" {
144144- shouldBeActive = false
145145- s := string(models.AccountStatusTakendown)
146146- status = &s
9191+ if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil {
9292+ r.Logger.Warn("failed to ping client", "err", err)
9393+ cancel()
9494+ return
9595+ }
9696+ case <-ctx.Done():
9797+ return
9898+ }
14799 }
100100+ }()
148101149149- // Broadcast the account event to all consumers
150150- err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
151151- RepoAccount: &comatproto.SyncSubscribeRepos_Account{
152152- Active: shouldBeActive,
153153- Did: env.RepoAccount.Did,
154154- Seq: env.RepoAccount.Seq,
155155- Status: status,
156156- Time: env.RepoAccount.Time,
157157- },
158158- PrivUid: account.UID,
159159- })
160160- if err != nil {
161161- r.Logger.Error("failed to broadcast Account event", "error", err, "did", env.RepoAccount.Did)
162162- return fmt.Errorf("failed to broadcast Account event: %w", err)
102102+ conn.SetPingHandler(func(message string) error {
103103+ err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second*60))
104104+ if err == websocket.ErrCloseSent {
105105+ return nil
106106+ } else if e, ok := err.(net.Error); ok && e.Temporary() {
107107+ return nil
163108 }
109109+ return err
110110+ })
164111165165- return nil
166166- case env.RepoMigrate != nil:
167167- eventsWarningsCounter.WithLabelValues(host.Hostname, "migrate").Add(1)
168168- // TODO: rate limit warnings per Host before we (temporarily?) block them
169169- return nil
170170- case env.RepoTombstone != nil:
171171- eventsWarningsCounter.WithLabelValues(host.Hostname, "tombstone").Add(1)
172172- // TODO: rate limit warnings per Host before we (temporarily?) block them
173173- return nil
174174- default:
175175- return fmt.Errorf("invalid fed event")
176176- }
177177-}
112112+ // Start a goroutine to read messages from the client and discard them.
113113+ go func() {
114114+ for {
115115+ _, _, err := conn.ReadMessage()
116116+ if err != nil {
117117+ r.Logger.Warn("failed to read message from client", "err", err)
118118+ cancel()
119119+ return
120120+ }
121121+ }
122122+ }()
178123179179-func (r *Relay) handleCommit(ctx context.Context, host *models.Host, evt *comatproto.SyncSubscribeRepos_Commit) error {
180180- r.Logger.Debug("relay got repo append event", "seq", evt.Seq, "host", host.Hostname, "repo", evt.Repo)
124124+ ident := realIP + "-" + req.UserAgent()
181125182182- did, err := syntax.ParseDID(evt.Repo)
126126+ evts, cleanup, err := r.Events.Subscribe(ctx, ident, func(evt *stream.XRPCStreamEvent) bool { return true }, since)
183127 if err != nil {
184184- return fmt.Errorf("invalid DID in message: %w", err)
128128+ return err
185129 }
186186- // XXX: did = did.Normalize()
187187- account, err := r.GetAccount(ctx, did)
188188- if err != nil {
189189- if !errors.Is(err, gorm.ErrRecordNotFound) {
190190- repoCommitsResultCounter.WithLabelValues(host.Hostname, "nou").Inc()
191191- return fmt.Errorf("looking up event user: %w", err)
192192- }
130130+ defer cleanup()
193131194194- account, err = r.CreateAccount(ctx, host, did)
195195- if err != nil {
196196- repoCommitsResultCounter.WithLabelValues(host.Hostname, "nuerr").Inc()
197197- return err
198198- }
199199- }
200200- if account == nil {
201201- repoCommitsResultCounter.WithLabelValues(host.Hostname, "nou2").Inc()
202202- return ErrAccountNotFound
132132+ // Keep track of the consumer for metrics and admin endpoints
133133+ consumer := SocketConsumer{
134134+ RemoteAddr: realIP,
135135+ UserAgent: req.UserAgent(),
136136+ ConnectedAt: time.Now(),
203137 }
138138+ sentCounter := eventsSentCounter.WithLabelValues(consumer.RemoteAddr, consumer.UserAgent)
139139+ consumer.EventsSent = sentCounter
204140205205- // XXX: lock on account
206206- ustatus := account.UpstreamStatus
141141+ consumerID := r.registerConsumer(&consumer)
142142+ defer r.cleanupConsumer(consumerID)
207143208208- // XXX: lock on account
209209- if account.Status == models.AccountStatusTakendown || ustatus == models.AccountStatusTakendown {
210210- r.Logger.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "host", host.Hostname)
211211- repoCommitsResultCounter.WithLabelValues(host.Hostname, "tdu").Inc()
212212- return nil
213213- }
144144+ logger := r.Logger.With(
145145+ "consumer_id", consumerID,
146146+ "remote_addr", consumer.RemoteAddr,
147147+ "user_agent", consumer.UserAgent,
148148+ )
214149215215- if ustatus == models.AccountStatusSuspended {
216216- r.Logger.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "host", host.Hostname)
217217- repoCommitsResultCounter.WithLabelValues(host.Hostname, "susu").Inc()
218218- return nil
219219- }
220220-221221- if ustatus == models.AccountStatusDeactivated {
222222- r.Logger.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "host", host.Hostname)
223223- repoCommitsResultCounter.WithLabelValues(host.Hostname, "du").Inc()
224224- return nil
225225- }
150150+ logger.Info("new consumer", "cursor", since)
226151227227- if evt.Rebase {
228228- repoCommitsResultCounter.WithLabelValues(host.Hostname, "rebase").Inc()
229229- return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Hostname)
230230- }
152152+ for {
153153+ select {
154154+ case evt, ok := <-evts:
155155+ if !ok {
156156+ logger.Error("event stream closed unexpectedly")
157157+ return nil
158158+ }
231159232232- accountHostId := account.HostID
233233- if host.ID != accountHostId && accountHostId != 0 {
234234- r.Logger.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", accountHostId, "gotPds", host.Hostname)
235235- // Flush any cached DID documents for this user
236236- r.purgeDidCache(ctx, evt.Repo)
160160+ wc, err := conn.NextWriter(websocket.BinaryMessage)
161161+ if err != nil {
162162+ logger.Error("failed to get next writer", "err", err)
163163+ return err
164164+ }
237165238238- account, err = r.syncHostAccount(ctx, did, host, account)
239239- if err != nil {
240240- repoCommitsResultCounter.WithLabelValues(host.Hostname, "uerr2").Inc()
241241- return err
242242- }
166166+ if evt.Preserialized != nil {
167167+ _, err = wc.Write(evt.Preserialized)
168168+ } else {
169169+ err = evt.Serialize(wc)
170170+ }
171171+ if err != nil {
172172+ return fmt.Errorf("failed to write event: %w", err)
173173+ }
243174244244- if account.HostID != host.ID && !r.Config.SkipAccountHostCheck {
245245- repoCommitsResultCounter.WithLabelValues(host.Hostname, "noauth").Inc()
246246- return fmt.Errorf("event from non-authoritative pds")
247247- }
248248- }
175175+ if err := wc.Close(); err != nil {
176176+ logger.Warn("failed to flush-close our event write", "err", err)
177177+ return nil
178178+ }
249179250250- // TODO: very messy fetch code here
251251- var repo *models.AccountRepo
252252- err = r.db.First(repo, account.UID).Error
253253- if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
254254- r.Logger.Error("failed to get previous root", "err", err)
255255- repo = nil
256256- }
257257- var prevRev *syntax.TID
258258- var prevData *cid.Cid
259259- if repo != nil {
260260- c, err := cid.Parse(repo.CommitData)
261261- if err != nil {
262262- return fmt.Errorf("parsing commitDataCID from database: %w", err)
180180+ lastWriteLk.Lock()
181181+ lastWrite = time.Now()
182182+ lastWriteLk.Unlock()
183183+ sentCounter.Inc()
184184+ case <-ctx.Done():
185185+ return nil
263186 }
264264- prevData = &c
265265- t := syntax.TID(repo.Rev)
266266- prevRev = &t
267187 }
268268- evtPrevDataStr := ""
269269- if evt.PrevData != nil {
270270- evtPrevDataStr = ((*cid.Cid)(evt.PrevData)).String()
271271- }
272272- commitDataCID, err := r.Validator.HandleCommit(ctx, host, account, evt, prevRev, prevData)
273273- if err != nil {
274274- // XXX: induction trace log
275275- r.Logger.Error("commit bad", "seq", evt.Seq, "host", host.Hostname, "repo", evt.Repo, "prev", evtPrevDataStr, "err", err)
276276- r.Logger.Warn("failed handling event", "err", err, "host", host.Hostname, "seq", evt.Seq, "repo", account.DID, "commit", evt.Commit.String())
277277- repoCommitsResultCounter.WithLabelValues(host.Hostname, "err").Inc()
278278- return fmt.Errorf("handle user event failed: %w", err)
279279- }
280280-281281- // TID syntax has been verified by validator
282282- rev := syntax.TID(evt.Rev)
283283-284284- err = r.UpsertAccountRepo(account.UID, rev, cid.Cid(evt.Commit), *commitDataCID)
285285- if err != nil {
286286- return fmt.Errorf("failed to set previous root uid=%d: %w", account.UID, err)
287287- }
288288-289289- repoCommitsResultCounter.WithLabelValues(host.Hostname, "ok").Inc()
290290-291291- // Broadcast the identity event to all consumers
292292- commitCopy := *evt
293293- err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
294294- RepoCommit: &commitCopy,
295295- PrivUid: account.UID,
296296- })
297297- if err != nil {
298298- r.Logger.Error("failed to broadcast commit event", "error", err, "did", evt.Repo)
299299- return fmt.Errorf("failed to broadcast commit event: %w", err)
300300- }
301301-302302- return nil
303188}
304189305305-// handleSync processes #sync messages
306306-func (r *Relay) handleSync(ctx context.Context, host *models.Host, evt *comatproto.SyncSubscribeRepos_Sync) error {
307307- did, err := syntax.ParseDID(evt.Did)
308308- if err != nil {
309309- return fmt.Errorf("invalid DID in message: %s", did)
310310- }
311311- // XXX: did.Normalize()
312312- account, err := r.GetAccount(ctx, did)
313313- if err != nil {
314314- if !errors.Is(err, gorm.ErrRecordNotFound) {
315315- repoCommitsResultCounter.WithLabelValues(host.Hostname, "nou").Inc()
316316- return fmt.Errorf("looking up event user: %w", err)
317317- }
318318-319319- account, err = r.CreateAccount(ctx, host, did)
320320- }
321321- if err != nil {
322322- return fmt.Errorf("could not get user for did %#v: %w", evt.Did, err)
323323- }
324324-325325- commitCID, commitDataCID, err := r.Validator.HandleSync(ctx, host, evt)
326326- if err != nil {
327327- return err
328328- }
329329- // TID syntax has been verified by validator
330330- rev := syntax.TID(evt.Rev)
331331-332332- // TODO: should this happen before or after firehose persist/broadcast?
333333- err = r.UpsertAccountRepo(account.UID, rev, *commitCID, *commitDataCID)
334334- if err != nil {
335335- return fmt.Errorf("failed to upsert repo state (uid %d): %w", account.UID, err)
336336- }
337337-338338- // Broadcast the sync event to all consumers
339339- evtCopy := *evt
340340- err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
341341- RepoSync: &evtCopy,
342342- })
343343- if err != nil {
344344- r.Logger.Error("failed to broadcast sync event", "error", err, "did", evt.Did)
345345- return fmt.Errorf("failed to broadcast sync event: %w", err)
346346- }
347347-348348- return nil
190190+type ConsumerInfo struct {
191191+ ID uint64 `json:"id"`
192192+ RemoteAddr string `json:"remote_addr"`
193193+ UserAgent string `json:"user_agent"`
194194+ EventsConsumed uint64 `json:"events_consumed"`
195195+ ConnectedAt time.Time `json:"connected_at"`
349196}
350197351351-func (r *Relay) purgeDidCache(ctx context.Context, did string) {
352352- ati, err := syntax.ParseAtIdentifier(did)
353353- if err != nil {
354354- return
198198+func (r *Relay) ListConsumers() []ConsumerInfo {
199199+ r.consumersLk.RLock()
200200+ defer r.consumersLk.RUnlock()
201201+202202+ info := make([]ConsumerInfo, 0, len(r.consumers))
203203+ for id, c := range r.consumers {
204204+ var m = &dto.Metric{}
205205+ if err := c.EventsSent.Write(m); err != nil {
206206+ continue
207207+ }
208208+ info = append(info, ConsumerInfo{
209209+ ID: id,
210210+ RemoteAddr: c.RemoteAddr,
211211+ UserAgent: c.UserAgent,
212212+ EventsConsumed: uint64(m.Counter.GetValue()),
213213+ ConnectedAt: c.ConnectedAt,
214214+ })
355215 }
356356- _ = r.dir.Purge(ctx, *ati)
216216+ return info
357217}
+384
cmd/relayered/relay/ingest.go
···11+package relay
22+33+import (
44+ "context"
55+ "errors"
66+ "fmt"
77+ "time"
88+99+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1010+ "github.com/bluesky-social/indigo/atproto/syntax"
1111+ "github.com/bluesky-social/indigo/cmd/relayered/relay/models"
1212+ "github.com/bluesky-social/indigo/cmd/relayered/stream"
1313+1414+ "github.com/ipfs/go-cid"
1515+ "go.opentelemetry.io/otel/attribute"
1616+ "gorm.io/gorm"
1717+)
1818+1919+// This callback function gets called by Slurper on every upstream repo stream message from any host.
2020+//
2121+// Messages are processed in-order for a single account on a single host; but may be concurrent or out-of-order for the same account *across* hosts (eg, during account migration or a conflict)
2222+func (r *Relay) processRepoEvent(ctx context.Context, evt *stream.XRPCStreamEvent, hostname string, hostID uint64) error {
2323+ ctx, span := tracer.Start(ctx, "processRepoEvent")
2424+ defer span.End()
2525+2626+ start := time.Now()
2727+ defer func() {
2828+ eventsHandleDuration.WithLabelValues(hostname).Observe(time.Since(start).Seconds())
2929+ }()
3030+3131+ EventsReceivedCounter.WithLabelValues(hostname).Add(1)
3232+3333+ switch {
3434+ case evt.RepoCommit != nil:
3535+ repoCommitsReceivedCounter.WithLabelValues(hostname).Add(1)
3636+ return r.processCommitEvent(ctx, evt.RepoCommit, hostname, hostID)
3737+ case evt.RepoSync != nil:
3838+ repoSyncReceivedCounter.WithLabelValues(hostname).Add(1)
3939+ return r.processSyncEvent(ctx, evt.RepoSync, hostname, hostID)
4040+ case evt.RepoIdentity != nil:
4141+ //repoIdentityReceivedCounter.WithLabelValues(hostname).Add(1)
4242+ return r.processIdentityEvent(ctx, evt.RepoIdentity, hostname, hostID)
4343+ case evt.RepoAccount != nil:
4444+ //repoAccountReceivedCounter.WithLabelValues(hostname).Add(1)
4545+ return r.processAccountEvent(ctx, evt.RepoAccount, hostname, hostID)
4646+ case evt.RepoHandle != nil: // DEPRECATED
4747+ eventsWarningsCounter.WithLabelValues(hostname, "handle").Add(1)
4848+ return nil
4949+ case evt.RepoMigrate != nil: // DEPRECATED
5050+ eventsWarningsCounter.WithLabelValues(hostname, "migrate").Add(1)
5151+ return nil
5252+ case evt.RepoTombstone != nil: // DEPRECATED
5353+ eventsWarningsCounter.WithLabelValues(hostname, "tombstone").Add(1)
5454+ return nil
5555+ default:
5656+ return fmt.Errorf("unhandled repo stream event type")
5757+ }
5858+}
5959+6060+func (r *Relay) processCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, hostname string, hostID uint64) error {
6161+ logger := r.Logger.With("did", evt.Repo, "seq", evt.Seq, "host", hostname, "eventType", "commit", "rev", evt.Rev)
6262+ logger.Debug("relay got repo append event")
6363+6464+ did, err := syntax.ParseDID(evt.Repo)
6565+ if err != nil {
6666+ return fmt.Errorf("invalid DID in message: %w", err)
6767+ }
6868+ // XXX: did = did.Normalize()
6969+ account, err := r.GetAccount(ctx, did)
7070+ if err != nil {
7171+ if !errors.Is(err, gorm.ErrRecordNotFound) {
7272+ return fmt.Errorf("looking up event user: %w", err)
7373+ }
7474+7575+ host, err := r.GetHost(ctx, hostID)
7676+ if err != nil {
7777+ return err
7878+ }
7979+ account, err = r.CreateAccount(ctx, host, did)
8080+ if err != nil {
8181+ return err
8282+ }
8383+ }
8484+ if account == nil {
8585+ return ErrAccountNotFound
8686+ }
8787+8888+ // XXX: lock on account
8989+ ustatus := account.UpstreamStatus
9090+9191+ // XXX: lock on account
9292+ if account.Status == models.AccountStatusTakendown || ustatus == models.AccountStatusTakendown {
9393+ logger.Debug("dropping commit event from taken down user")
9494+ return nil
9595+ }
9696+9797+ if ustatus == models.AccountStatusSuspended {
9898+ logger.Debug("dropping commit event from suspended user")
9999+ return nil
100100+ }
101101+102102+ if ustatus == models.AccountStatusDeactivated {
103103+ logger.Debug("dropping commit event from deactivated user")
104104+ return nil
105105+ }
106106+107107+ if evt.Rebase {
108108+ return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, hostname)
109109+ }
110110+111111+ accountHostId := account.HostID
112112+ if hostID != accountHostId && accountHostId != 0 {
113113+ // XXX: metter logging
114114+ logger.Warn("received event for repo from different pds than expected", "expectedHostID", accountHostId, "receivedHost", hostname)
115115+ // Flush any cached DID documents for this user
116116+ err = r.dir.Purge(ctx, did.AtIdentifier())
117117+ if err != nil {
118118+ logger.Error("problem purging identity directory cache", "err", err)
119119+ }
120120+121121+ // XXX: shouldn't need full Host?
122122+ host, err := r.GetHost(ctx, hostID)
123123+ if err != nil {
124124+ return err
125125+ }
126126+127127+ account, err = r.syncHostAccount(ctx, did, host, account)
128128+ if err != nil {
129129+ return err
130130+ }
131131+132132+ if account.HostID != hostID && !r.Config.SkipAccountHostCheck {
133133+ return fmt.Errorf("event from non-authoritative pds")
134134+ }
135135+ }
136136+137137+ // TODO: very messy fetch code here
138138+ var repo *models.AccountRepo
139139+ err = r.db.First(repo, account.UID).Error
140140+ if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
141141+ logger.Error("failed to get previous root", "err", err)
142142+ repo = nil
143143+ }
144144+ var prevRev *syntax.TID
145145+ var prevData *cid.Cid
146146+ if repo != nil {
147147+ c, err := cid.Parse(repo.CommitData)
148148+ if err != nil {
149149+ return fmt.Errorf("parsing commitDataCID from database: %w", err)
150150+ }
151151+ prevData = &c
152152+ t := syntax.TID(repo.Rev)
153153+ prevRev = &t
154154+ }
155155+ evtPrevDataStr := ""
156156+ if evt.PrevData != nil {
157157+ evtPrevDataStr = ((*cid.Cid)(evt.PrevData)).String()
158158+ }
159159+ commitDataCID, err := r.Validator.HandleCommit(ctx, hostname, account, evt, prevRev, prevData)
160160+ if err != nil {
161161+ // XXX: induction trace log
162162+ logger.Error("commit bad", "prevData", evtPrevDataStr, "err", err)
163163+ logger.Warn("failed handling event", "err", err, "commitCID", evt.Commit.String())
164164+ return fmt.Errorf("handle user event failed: %w", err)
165165+ }
166166+167167+ // TID syntax has been verified by validator
168168+ rev := syntax.TID(evt.Rev)
169169+170170+ err = r.UpsertAccountRepo(account.UID, rev, cid.Cid(evt.Commit), *commitDataCID)
171171+ if err != nil {
172172+ return fmt.Errorf("failed to set previous root uid=%d: %w", account.UID, err)
173173+ }
174174+175175+ // Broadcast the identity event to all consumers
176176+ commitCopy := *evt
177177+ err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
178178+ RepoCommit: &commitCopy,
179179+ PrivUid: account.UID,
180180+ })
181181+ if err != nil {
182182+ logger.Error("failed to broadcast commit event", "error", err)
183183+ return fmt.Errorf("failed to broadcast commit event: %w", err)
184184+ }
185185+186186+ return nil
187187+}
188188+189189+func (r *Relay) processSyncEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync, hostname string, hostID uint64) error {
190190+ logger := r.Logger.With("did", evt.Did, "seq", evt.Seq, "host", hostname, "eventType", "sync")
191191+ did, err := syntax.ParseDID(evt.Did)
192192+ if err != nil {
193193+ return fmt.Errorf("invalid DID in message: %s", did)
194194+ }
195195+ // XXX: did.Normalize()
196196+ account, err := r.GetAccount(ctx, did)
197197+ if err != nil {
198198+ if !errors.Is(err, gorm.ErrRecordNotFound) {
199199+ return fmt.Errorf("looking up event user: %w", err)
200200+ }
201201+202202+ host, err := r.GetHost(ctx, hostID)
203203+ if err != nil {
204204+ return err
205205+ }
206206+ account, err = r.CreateAccount(ctx, host, did)
207207+ }
208208+ if err != nil {
209209+ return fmt.Errorf("could not get user for did %#v: %w", evt.Did, err)
210210+ }
211211+212212+ commitCID, commitDataCID, err := r.Validator.HandleSync(ctx, hostname, evt)
213213+ if err != nil {
214214+ return err
215215+ }
216216+ // TID syntax has been verified by validator
217217+ rev := syntax.TID(evt.Rev)
218218+219219+ // TODO: should this happen before or after firehose persist/broadcast?
220220+ err = r.UpsertAccountRepo(account.UID, rev, *commitCID, *commitDataCID)
221221+ if err != nil {
222222+ return fmt.Errorf("failed to upsert repo state (uid %d): %w", account.UID, err)
223223+ }
224224+225225+ // Broadcast the sync event to all consumers
226226+ evtCopy := *evt
227227+ err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
228228+ RepoSync: &evtCopy,
229229+ })
230230+ if err != nil {
231231+ logger.Error("failed to broadcast sync event", "error", err)
232232+ return fmt.Errorf("failed to broadcast sync event: %w", err)
233233+ }
234234+235235+ return nil
236236+}
237237+238238+func (r *Relay) processIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity, hostname string, hostID uint64) error {
239239+ logger := r.Logger.With("did", evt.Did, "seq", evt.Seq, "host", hostname, "eventType", "identity")
240240+ logger.Info("relay got identity event")
241241+242242+ did, err := syntax.ParseDID(evt.Did)
243243+ if err != nil {
244244+ return fmt.Errorf("invalid DID in message: %w", err)
245245+ }
246246+247247+ // Flush any cached DID documents for this user
248248+ r.dir.Purge(ctx, did.AtIdentifier())
249249+ if err != nil {
250250+ logger.Error("problem purging identity directory cache", "err", err)
251251+ }
252252+253253+ // XXX: syncHostAccount doesn't need full Host?
254254+ host, err := r.GetHost(ctx, hostID)
255255+ if err != nil {
256256+ return err
257257+ }
258258+259259+ // Refetch the DID doc and update our cached keys and handle etc.
260260+ account, err := r.syncHostAccount(ctx, did, host, nil)
261261+ if err != nil {
262262+ return err
263263+ }
264264+265265+ // Broadcast the identity event to all consumers
266266+ err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
267267+ RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
268268+ Did: did.String(),
269269+ Seq: evt.Seq,
270270+ Time: evt.Time,
271271+ Handle: evt.Handle,
272272+ },
273273+ PrivUid: account.UID,
274274+ })
275275+ if err != nil {
276276+ logger.Error("failed to broadcast Identity event", "error", err)
277277+ return fmt.Errorf("failed to broadcast Identity event: %w", err)
278278+ }
279279+280280+ return nil
281281+}
282282+283283+func (r *Relay) processAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account, hostname string, hostID uint64) error {
284284+ logger := r.Logger.With("did", evt.Did, "seq", evt.Seq, "host", hostname, "eventType", "account")
285285+286286+ ctx, span := tracer.Start(ctx, "processAccountEvent")
287287+ defer span.End()
288288+ span.SetAttributes(
289289+ attribute.String("did", evt.Did),
290290+ attribute.Int64("seq", evt.Seq),
291291+ attribute.Bool("active", evt.Active),
292292+ )
293293+294294+ did, err := syntax.ParseDID(evt.Did)
295295+ if err != nil {
296296+ return fmt.Errorf("invalid DID in message: %w", err)
297297+ }
298298+299299+ if evt.Status != nil {
300300+ span.SetAttributes(attribute.String("repo_status", *evt.Status))
301301+ }
302302+ logger.Info("relay got account event")
303303+304304+ if !evt.Active && evt.Status == nil {
305305+ // TODO: semantics here aren't really clear
306306+ logger.Warn("dropping invalid account event", "active", evt.Active, "status", evt.Status)
307307+ accountVerifyWarnings.WithLabelValues(hostname, "nostat").Inc()
308308+ return nil
309309+ }
310310+311311+ // Flush any cached DID documents for this user
312312+ r.dir.Purge(ctx, did.AtIdentifier())
313313+ if err != nil {
314314+ logger.Error("problem purging identity directory cache", "err", err)
315315+ }
316316+317317+ // XXX: shouldn't need full host?
318318+ host, err := r.GetHost(ctx, hostID)
319319+ if err != nil {
320320+ return err
321321+ }
322322+323323+ // Refetch the DID doc to make sure the Host is still authoritative
324324+ account, err := r.syncHostAccount(ctx, did, host, nil)
325325+ if err != nil {
326326+ span.RecordError(err)
327327+ return err
328328+ }
329329+330330+ // Check if the Host is still authoritative
331331+ // if not we don't want to be propagating this account event
332332+ // XXX: lock
333333+ if account.HostID != hostID && !r.Config.SkipAccountHostCheck {
334334+ logger.Error("account event from non-authoritative pds",
335335+ "event_from", hostname,
336336+ "did_doc_declared_pds", account.HostID,
337337+ "account_evt", evt,
338338+ )
339339+ return fmt.Errorf("event from non-authoritative pds")
340340+ }
341341+342342+ // Process the account status change
343343+ repoStatus := models.AccountStatusActive
344344+ if !evt.Active && evt.Status != nil {
345345+ repoStatus = models.AccountStatus(*evt.Status)
346346+ }
347347+348348+ // XXX: lock, and parse
349349+ account.UpstreamStatus = models.AccountStatus(repoStatus)
350350+ err = r.db.Save(account).Error
351351+ if err != nil {
352352+ span.RecordError(err)
353353+ return fmt.Errorf("failed to update account status: %w", err)
354354+ }
355355+356356+ shouldBeActive := evt.Active
357357+ status := evt.Status
358358+359359+ // override with local status
360360+ // XXX: lock
361361+ if account.Status == "takendown" {
362362+ shouldBeActive = false
363363+ s := string(models.AccountStatusTakendown)
364364+ status = &s
365365+ }
366366+367367+ // Broadcast the account event to all consumers
368368+ err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{
369369+ RepoAccount: &comatproto.SyncSubscribeRepos_Account{
370370+ Active: shouldBeActive,
371371+ Did: evt.Did,
372372+ Seq: evt.Seq,
373373+ Status: status,
374374+ Time: evt.Time,
375375+ },
376376+ PrivUid: account.UID,
377377+ })
378378+ if err != nil {
379379+ logger.Error("failed to broadcast Account event", "error", err)
380380+ return fmt.Errorf("failed to broadcast Account event: %w", err)
381381+ }
382382+383383+ return nil
384384+}
+2
cmd/relayered/relay/metrics.go
···3131 Help: "The total number of sync events received",
3232}, []string{"pds"})
33333434+/* XXX
3435var repoCommitsResultCounter = promauto.NewCounterVec(prometheus.CounterOpts{
3536 Name: "repo_commits_result_counter",
3637 Help: "The results of commit events received",
3738}, []string{"pds", "status"})
3939+*/
38403941var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{
4042 Name: "events_sent_counter",
+3-3
cmd/relayered/relay/relay.go
···2626 HostChecker HostChecker
2727 Config RelayConfig
28282929- // extUserLk serializes a section of syncHostAccount()
2929+ // accountLk serializes a section of syncHostAccount()
3030 // TODO: at some point we will want to lock specific DIDs, this lock as is
3131 // is overly broad, but i dont expect it to be a bottleneck for now
3232- extUserLk sync.Mutex
3232+ accountLk sync.Mutex
33333434 // Management of Socket Consumers
3535 consumersLk sync.RWMutex
···9696 slOpts.DefaultRepoLimit = config.DefaultRepoLimit
9797 slOpts.ConcurrencyPerHost = config.ConcurrencyPerHost
9898 slOpts.MaxQueuePerHost = config.MaxQueuePerHost
9999- s, err := NewSlurper(db, r.handleFedEvent, slOpts, r.Logger)
9999+ s, err := NewSlurper(db, r.processRepoEvent, slOpts, r.Logger)
100100 if err != nil {
101101 return nil, err
102102 }