this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

handle identity evts

dholms e74ab5e5 5105e6d4

+134 -50
+2 -2
nexus/handlers.go
··· 35 35 36 36 n.logger.Info("websocket connected") 37 37 38 - return n.outbox.Subscribe(c.Request().Context(), func(op *Op) error { 39 - return ws.WriteJSON(op) 38 + return n.outbox.Subscribe(c.Request().Context(), func(evt *OutboxEvt) error { 39 + return ws.WriteJSON(evt) 40 40 }) 41 41 } 42 42
+1
nexus/models/models.go
··· 13 13 type Repo struct { 14 14 Did string `gorm:"primaryKey"` 15 15 State RepoState `gorm:"not null;default:'pending';index"` 16 + Handle string `gorm:"type:text"` 16 17 Rev string `gorm:"type:text"` 17 18 PrevData string `gorm:"type:text"` 18 19 ErrorMsg string `gorm:"type:text"`
+4 -6
nexus/nexus.go
··· 51 51 e.HideBanner = true 52 52 53 53 bdir := identity.BaseDirectory{ 54 - SkipHandleVerification: true, 55 - TryAuthoritativeDNS: false, 56 - SkipDNSDomainSuffixes: []string{".bsky.social"}, 54 + TryAuthoritativeDNS: false, 55 + SkipDNSDomainSuffixes: []string{".bsky.social"}, 57 56 } 58 - cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5) 57 + cdir := identity.NewCacheDirectory(&bdir, 2_000_000, time.Hour*24, time.Minute*2, time.Minute*5) 59 58 60 59 n := &Nexus{ 61 60 db: db, ··· 98 97 return n.EventProcessor.ProcessSync(context.Background(), evt) 99 98 }, 100 99 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 101 - // @TODO 102 - return nil 100 + return n.EventProcessor.ProcessIdentity(context.Background(), evt) 103 101 }, 104 102 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 105 103 // @TODO
+28 -15
nexus/outbox.go
··· 11 11 12 12 type Outbox struct { 13 13 db *gorm.DB 14 - outCh chan *Op 14 + outCh chan *OutboxEvt 15 15 logger *slog.Logger 16 16 } 17 17 18 18 func NewOutbox(db *gorm.DB) *Outbox { 19 19 return &Outbox{ 20 20 db: db, 21 - outCh: make(chan *Op, 100), 21 + outCh: make(chan *OutboxEvt, 100), 22 22 logger: slog.Default().With("system", "outbox"), 23 23 } 24 24 } 25 25 26 - func (o *Outbox) Subscribe(ctx context.Context, send func(op *Op) error) error { 26 + func (o *Outbox) Subscribe(ctx context.Context, send func(evt *OutboxEvt) error) error { 27 27 var bufferedEvts []models.OutboxBuffer 28 28 if err := o.db.Order("id ASC").Find(&bufferedEvts).Error; err != nil { 29 29 o.logger.Error("failed to load buffered events", "error", err) ··· 33 33 if len(bufferedEvts) > 0 { 34 34 o.logger.Info("draining buffered events", "count", len(bufferedEvts)) 35 35 for _, evt := range bufferedEvts { 36 - // Unmarshal buffered JSON back to Commit 37 - var op Op 38 - if err := json.Unmarshal([]byte(evt.Data), &op); err != nil { 39 - o.logger.Error("failed to unmarshal buffered op", "error", err, "id", evt.ID) 36 + var outboxEvt OutboxEvt 37 + if err := json.Unmarshal([]byte(evt.Data), &outboxEvt); err != nil { 38 + o.logger.Error("failed to unmarshal buffered event", "error", err, "id", evt.ID) 40 39 continue 41 40 } 42 41 43 - if err := send(&op); err != nil { 42 + if err := send(&outboxEvt); err != nil { 44 43 o.logger.Info("send error during drain", "error", err) 45 44 return err 46 45 } ··· 58 57 select { 59 58 case <-ctx.Done(): 60 59 return ctx.Err() 61 - case op := <-o.outCh: 62 - if err := send(op); err != nil { 60 + case evt := <-o.outCh: 61 + if err := send(evt); err != nil { 63 62 o.logger.Info("send error during live stream", "error", err) 64 63 return err 65 64 } ··· 67 66 } 68 67 } 69 68 70 - func (o *Outbox) Send(op *Op) error { 69 + func (o *Outbox) SendRecordEvt(evt *RecordEvt) error { 70 + return o.SendOutboxEvt(&OutboxEvt{ 71 + Type: "record", 72 + RecordEvt: evt, 73 + }) 74 + } 75 + 76 + func (o *Outbox) SendUserEvt(evt *UserEvt) error { 77 + return o.SendOutboxEvt(&OutboxEvt{ 78 + Type: "user", 79 + UserEvt: evt, 80 + }) 81 + } 82 + 83 + func (o *Outbox) SendOutboxEvt(evt *OutboxEvt) error { 71 84 select { 72 - case o.outCh <- op: 85 + case o.outCh <- evt: 73 86 return nil 74 87 default: 75 - return o.bufferToDB(op) 88 + return o.bufferEvent(evt) 76 89 } 77 90 } 78 91 79 - func (o *Outbox) bufferToDB(op *Op) error { 80 - jsonData, err := json.Marshal(op) 92 + func (o *Outbox) bufferEvent(evt *OutboxEvt) error { 93 + jsonData, err := json.Marshal(evt) 81 94 if err != nil { 82 95 return err 83 96 }
+77 -19
nexus/processor.go
··· 28 28 seqMu sync.Mutex 29 29 } 30 30 31 + func (ep *EventProcessor) ProcessIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error { 32 + defer ep.trackLastSeq(evt.Seq) 33 + 34 + curr, err := ep.GetRepoState(evt.Did) 35 + if err != nil { 36 + return err 37 + } else if curr == nil { 38 + return nil 39 + } 40 + 41 + did := syntax.DID(evt.Did) 42 + if err := ep.Dir.Purge(ctx, did.AtIdentifier()); err != nil { 43 + ep.Logger.Error("failed to purge identity cache", "did", evt.Did, "error", err) 44 + } 45 + 46 + id, err := ep.Dir.LookupDID(ctx, did) 47 + if err != nil { 48 + return err 49 + } 50 + 51 + handleStr := id.Handle.String() 52 + if handleStr == curr.Handle { 53 + return nil 54 + } 55 + 56 + if evt.Handle == nil || *evt.Handle == "handle.invalid" { 57 + return nil 58 + } 59 + 60 + userEvt := &UserEvt{ 61 + Did: evt.Did, 62 + Handle: handleStr, 63 + } 64 + 65 + if err := ep.Outbox.SendUserEvt(userEvt); err != nil { 66 + ep.Logger.Error("failed to send user evt", "did", evt.Did, "error", err) 67 + return err 68 + } 69 + 70 + if err := ep.DB.Model(&models.Repo{}). 71 + Where("did = ?", evt.Did). 72 + Update("handle", handleStr).Error; err != nil { 73 + ep.Logger.Error("failed to update handle", "did", evt.Did, "handle", handleStr, "error", err) 74 + return err 75 + } 76 + 77 + return nil 78 + } 79 + 31 80 func (ep *EventProcessor) ProcessSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) error { 32 81 defer ep.trackLastSeq(evt.Seq) 33 82 34 - var curr models.Repo 35 - if err := ep.DB.First(&curr, "did = ?", evt.Did).Error; err != nil { 36 - if err != gorm.ErrRecordNotFound { 37 - ep.Logger.Error("failed to get repo state", "did", evt.Did, "error", err) 38 - } 83 + curr, err := ep.GetRepoState(evt.Did) 84 + if err != nil { 85 + return err 86 + } else if curr == nil { 39 87 return nil 40 88 } 41 89 ··· 69 117 func (ep *EventProcessor) ProcessCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 70 118 defer ep.trackLastSeq(evt.Seq) 71 119 72 - var d models.Repo 73 - if err := ep.DB.First(&d, "did = ?", evt.Repo).Error; err != nil { 74 - if err != gorm.ErrRecordNotFound { 75 - ep.Logger.Error("failed to get repo state", "did", evt.Repo, "error", err) 76 - } 120 + curr, err := ep.GetRepoState(evt.Repo) 121 + if err != nil { 122 + return err 123 + } else if curr == nil { 77 124 return nil 78 125 } 79 126 80 - if d.State != models.RepoStateActive && d.State != models.RepoStateResyncing { 127 + if curr.State != models.RepoStateActive && curr.State != models.RepoStateResyncing { 81 128 return nil 82 129 } 83 130 84 - if d.Rev != "" && evt.Rev <= d.Rev { 85 - ep.Logger.Debug("skipping replayed event", "did", evt.Repo, "eventRev", evt.Rev, "currentRev", d.Rev) 131 + if curr.Rev != "" && evt.Rev <= curr.Rev { 132 + ep.Logger.Debug("skipping replayed event", "did", evt.Repo, "eventRev", evt.Rev, "currentRev", curr.Rev) 86 133 return nil 87 134 } 88 135 89 136 if evt.PrevData == nil { 90 137 ep.Logger.Debug("legacy commit event, skipping prev data check", "did", evt.Repo, "rev", evt.Rev) 91 - } else if evt.PrevData.String() != d.PrevData { 138 + } else if evt.PrevData.String() != curr.PrevData { 92 139 ep.Logger.Warn("repo state desynced", "did", evt.Repo, "rev", evt.Rev) 93 140 // gets picked up by resync workers 94 141 if err := ep.UpdateRepoState(evt.Repo, models.RepoStateDesynced); err != nil { ··· 104 151 return err 105 152 } 106 153 107 - if d.State == models.RepoStateResyncing { 154 + if curr.State == models.RepoStateResyncing { 108 155 if err := ep.addToResyncBuffer(commit); err != nil { 109 156 ep.Logger.Error("failed to buffer commit", "did", evt.Repo, "error", err) 110 157 return err 111 158 } 112 159 } 113 160 114 - for _, op := range commit.ToOps() { 115 - if err := ep.Outbox.Send(op); err != nil { 161 + for _, recEvt := range commit.ToEvts() { 162 + if err := ep.Outbox.SendRecordEvt(recEvt); err != nil { 116 163 ep.Logger.Error("failed to send to outbox", "did", commit.Did, "rev", commit.Rev, "error", err) 117 164 return err 118 165 } ··· 250 297 return fmt.Errorf("failed to unmarshal buffered event: %w", err) 251 298 } 252 299 253 - for _, op := range commit.ToOps() { 254 - if err := ep.Outbox.Send(op); err != nil { 300 + for _, recEvt := range commit.ToEvts() { 301 + if err := ep.Outbox.SendRecordEvt(recEvt); err != nil { 255 302 ep.Logger.Error("failed to send to outbox", "did", commit.Did, "rev", commit.Rev, "error", err) 256 303 return err 257 304 } ··· 322 369 return 0, err 323 370 } 324 371 return cursor.Cursor, nil 372 + } 373 + 374 + func (ep *EventProcessor) GetRepoState(did string) (*models.Repo, error) { 375 + var r models.Repo 376 + if err := ep.DB.First(&r, "did = ?", did).Error; err != nil { 377 + if err != gorm.ErrRecordNotFound { 378 + return nil, err 379 + } 380 + return nil, nil 381 + } 382 + return &r, nil 325 383 } 326 384 327 385 func (ep *EventProcessor) UpdateRepoState(did string, state models.RepoState) error {
+3 -3
nexus/resync.go
··· 168 168 return nil 169 169 } 170 170 171 - op := &Op{ 171 + evt := &RecordEvt{ 172 172 Did: did, 173 173 Collection: collStr, 174 174 Rkey: rkeyStr, ··· 177 177 Cid: recCid.String(), 178 178 } 179 179 180 - if err := n.outbox.Send(op); err != nil { 181 - return fmt.Errorf("failed to send op: %w", err) 180 + if err := n.outbox.SendRecordEvt(evt); err != nil { 181 + return fmt.Errorf("failed to send evt: %w", err) 182 182 } 183 183 184 184 repoRecord := models.RepoRecord{
+19 -5
nexus/types.go
··· 15 15 Cid string `json:"cid,omitempty"` 16 16 } 17 17 18 - func (c *Commit) ToOps() []*Op { 19 - var ops []*Op 18 + func (c *Commit) ToEvts() []*RecordEvt { 19 + var evts []*RecordEvt 20 20 for _, op := range c.Ops { 21 - ops = append(ops, &Op{ 21 + evts = append(evts, &RecordEvt{ 22 22 Did: c.Did, 23 23 Rev: c.Rev, 24 24 Collection: op.Collection, ··· 28 28 Cid: op.Cid, 29 29 }) 30 30 } 31 - return ops 31 + return evts 32 32 } 33 33 34 - type Op struct { 34 + type RecordEvt struct { 35 35 Did string `json:"did"` 36 36 Rev string `json:"rev"` 37 37 Collection string `json:"collection"` ··· 40 40 Record map[string]interface{} `json:"record,omitempty"` 41 41 Cid string `json:"cid,omitempty"` 42 42 } 43 + 44 + type UserEvt struct { 45 + Did string `json:"did"` 46 + Handle string `json:"handle"` 47 + Pds string `json:"pds"` 48 + IsActive string `json:"is_active"` 49 + Status string `json:"status"` 50 + } 51 + 52 + type OutboxEvt struct { 53 + Type string `json:"type"` 54 + RecordEvt *RecordEvt `json:"evt,omitempty"` 55 + UserEvt *UserEvt `json:"evt,omitempty"` 56 + }